]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/TCP.h
update sources to v12.1.0
[ceph.git] / ceph / src / msg / async / dpdk / TCP.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
22
23#ifndef CEPH_DPDK_TCP_H_
24#define CEPH_DPDK_TCP_H_
25
26#include <unordered_map>
27#include <map>
28#include <queue>
29#include <functional>
30#include <deque>
31#include <chrono>
32#include <random>
33#include <stdexcept>
34#include <system_error>
35
36#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
37#include <cryptopp/md5.h>
38
39#include "msg/async/dpdk/EventDPDK.h"
40
41#include "include/utime.h"
42#include "common/Throttle.h"
43#include "common/ceph_time.h"
44#include "msg/async/Event.h"
45#include "IPChecksum.h"
46#include "IP.h"
47#include "const.h"
48#include "byteorder.h"
49#include "shared_ptr.h"
50#include "PacketUtil.h"
51
52struct tcp_hdr;
53
54enum class tcp_state : uint16_t {
55 CLOSED = (1 << 0),
56 LISTEN = (1 << 1),
57 SYN_SENT = (1 << 2),
58 SYN_RECEIVED = (1 << 3),
59 ESTABLISHED = (1 << 4),
60 FIN_WAIT_1 = (1 << 5),
61 FIN_WAIT_2 = (1 << 6),
62 CLOSE_WAIT = (1 << 7),
63 CLOSING = (1 << 8),
64 LAST_ACK = (1 << 9),
65 TIME_WAIT = (1 << 10)
66};
67
68inline tcp_state operator|(tcp_state s1, tcp_state s2) {
69 return tcp_state(uint16_t(s1) | uint16_t(s2));
70}
71
31f18b77 72inline std::ostream & operator<<(std::ostream & str, const tcp_state& s) {
7c673cae
FG
73 switch (s) {
74 case tcp_state::CLOSED: return str << "CLOSED";
75 case tcp_state::LISTEN: return str << "LISTEN";
76 case tcp_state::SYN_SENT: return str << "SYN_SENT";
77 case tcp_state::SYN_RECEIVED: return str << "SYN_RECEIVED";
78 case tcp_state::ESTABLISHED: return str << "ESTABLISHED";
79 case tcp_state::FIN_WAIT_1: return str << "FIN_WAIT_1";
80 case tcp_state::FIN_WAIT_2: return str << "FIN_WAIT_2";
81 case tcp_state::CLOSE_WAIT: return str << "CLOSE_WAIT";
82 case tcp_state::CLOSING: return str << "CLOSING";
83 case tcp_state::LAST_ACK: return str << "LAST_ACK";
84 case tcp_state::TIME_WAIT: return str << "TIME_WAIT";
85 default: return str << "UNKNOWN";
86 }
87}
88
89struct tcp_option {
90 // The kind and len field are fixed and defined in TCP protocol
91 enum class option_kind: uint8_t { mss = 2, win_scale = 3, sack = 4, timestamps = 8, nop = 1, eol = 0 };
92 enum class option_len: uint8_t { mss = 4, win_scale = 3, sack = 2, timestamps = 10, nop = 1, eol = 1 };
93 struct mss {
94 option_kind kind = option_kind::mss;
95 option_len len = option_len::mss;
96 uint16_t mss;
97 struct mss hton() {
98 struct mss m = *this;
99 m.mss = ::hton(m.mss);
100 return m;
101 }
102 } __attribute__((packed));
103 struct win_scale {
104 option_kind kind = option_kind::win_scale;
105 option_len len = option_len::win_scale;
106 uint8_t shift;
107 } __attribute__((packed));
108 struct sack {
109 option_kind kind = option_kind::sack;
110 option_len len = option_len::sack;
111 } __attribute__((packed));
112 struct timestamps {
113 option_kind kind = option_kind::timestamps;
114 option_len len = option_len::timestamps;
115 uint32_t t1;
116 uint32_t t2;
117 } __attribute__((packed));
118 struct nop {
119 option_kind kind = option_kind::nop;
120 } __attribute__((packed));
121 struct eol {
122 option_kind kind = option_kind::eol;
123 } __attribute__((packed));
124 static const uint8_t align = 4;
125
126 void parse(uint8_t* beg, uint8_t* end);
127 uint8_t fill(tcp_hdr* th, uint8_t option_size);
128 uint8_t get_size(bool syn_on, bool ack_on);
129
130 // For option negotiattion
131 bool _mss_received = false;
132 bool _win_scale_received = false;
133 bool _timestamps_received = false;
134 bool _sack_received = false;
135
136 // Option data
137 uint16_t _remote_mss = 536;
138 uint16_t _local_mss;
139 uint8_t _remote_win_scale = 0;
140 uint8_t _local_win_scale = 0;
141};
142inline uint8_t*& operator+=(uint8_t*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
143inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
144
145struct tcp_sequence {
146 uint32_t raw;
147};
148
149tcp_sequence ntoh(tcp_sequence ts) {
150 return tcp_sequence { ::ntoh(ts.raw) };
151}
152
153tcp_sequence hton(tcp_sequence ts) {
154 return tcp_sequence { ::hton(ts.raw) };
155}
156
31f18b77 157inline std::ostream& operator<<(std::ostream& os, const tcp_sequence& s) {
7c673cae
FG
158 return os << s.raw;
159}
160
161inline tcp_sequence make_seq(uint32_t raw) { return tcp_sequence{raw}; }
162inline tcp_sequence& operator+=(tcp_sequence& s, int32_t n) { s.raw += n; return s; }
163inline tcp_sequence& operator-=(tcp_sequence& s, int32_t n) { s.raw -= n; return s; }
164inline tcp_sequence operator+(tcp_sequence s, int32_t n) { return s += n; }
165inline tcp_sequence operator-(tcp_sequence s, int32_t n) { return s -= n; }
166inline int32_t operator-(tcp_sequence s, tcp_sequence q) { return s.raw - q.raw; }
167inline bool operator==(tcp_sequence s, tcp_sequence q) { return s.raw == q.raw; }
168inline bool operator!=(tcp_sequence s, tcp_sequence q) { return !(s == q); }
169inline bool operator<(tcp_sequence s, tcp_sequence q) { return s - q < 0; }
170inline bool operator>(tcp_sequence s, tcp_sequence q) { return q < s; }
171inline bool operator<=(tcp_sequence s, tcp_sequence q) { return !(s > q); }
172inline bool operator>=(tcp_sequence s, tcp_sequence q) { return !(s < q); }
173
174struct tcp_hdr {
175 uint16_t src_port;
176 uint16_t dst_port;
177 tcp_sequence seq;
178 tcp_sequence ack;
179 uint8_t rsvd1 : 4;
180 uint8_t data_offset : 4;
181 uint8_t f_fin : 1;
182 uint8_t f_syn : 1;
183 uint8_t f_rst : 1;
184 uint8_t f_psh : 1;
185 uint8_t f_ack : 1;
186 uint8_t f_urg : 1;
187 uint8_t rsvd2 : 2;
188 uint16_t window;
189 uint16_t checksum;
190 uint16_t urgent;
191
192 tcp_hdr hton() {
193 tcp_hdr hdr = *this;
194 hdr.src_port = ::hton(src_port);
195 hdr.dst_port = ::hton(dst_port);
196 hdr.seq = ::hton(seq);
197 hdr.ack = ::hton(ack);
198 hdr.window = ::hton(window);
199 hdr.checksum = ::hton(checksum);
200 hdr.urgent = ::hton(urgent);
201 return hdr;
202 }
203
204 tcp_hdr ntoh() {
205 tcp_hdr hdr = *this;
206 hdr.src_port = ::ntoh(src_port);
207 hdr.dst_port = ::ntoh(dst_port);
208 hdr.seq = ::ntoh(seq);
209 hdr.ack = ::ntoh(ack);
210 hdr.window = ::ntoh(window);
211 hdr.checksum = ::ntoh(checksum);
212 hdr.urgent = ::ntoh(urgent);
213 return hdr;
214 }
215} __attribute__((packed));
216
217struct tcp_tag {};
218using tcp_packet_merger = packet_merger<tcp_sequence, tcp_tag>;
219
220template <typename InetTraits>
221class tcp {
222 public:
223 using ipaddr = typename InetTraits::address_type;
224 using inet_type = typename InetTraits::inet_type;
225 using connid = l4connid<InetTraits>;
226 using connid_hash = typename connid::connid_hash;
227 class connection;
228 class listener;
229 private:
230 class tcb;
231
232 class C_handle_delayed_ack : public EventCallback {
233 tcb *tc;
234
235 public:
236 C_handle_delayed_ack(tcb *t): tc(t) { }
237 void do_request(int r) {
238 tc->_nr_full_seg_received = 0;
239 tc->output();
240 }
241 };
242
243 class C_handle_retransmit : public EventCallback {
244 tcb *tc;
245
246 public:
247 C_handle_retransmit(tcb *t): tc(t) { }
248 void do_request(int r) {
249 tc->retransmit();
250 }
251 };
252
253 class C_handle_persist : public EventCallback {
254 tcb *tc;
255
256 public:
257 C_handle_persist(tcb *t): tc(t) { }
258 void do_request(int r) {
259 tc->persist();
260 }
261 };
262
263 class C_all_data_acked : public EventCallback {
264 tcb *tc;
265
266 public:
267 C_all_data_acked(tcb *t): tc(t) {}
268 void do_request(int fd_or_id) {
269 tc->close_final_cleanup();
270 }
271 };
272
273 class C_actual_remove_tcb : public EventCallback {
274 lw_shared_ptr<tcb> tc;
275 public:
276 C_actual_remove_tcb(tcb *t): tc(t->shared_from_this()) {}
277 void do_request(int r) {
278 delete this;
279 }
280 };
281
282 class tcb : public enable_lw_shared_from_this<tcb> {
283 using clock_type = ceph::coarse_real_clock;
284 static constexpr tcp_state CLOSED = tcp_state::CLOSED;
285 static constexpr tcp_state LISTEN = tcp_state::LISTEN;
286 static constexpr tcp_state SYN_SENT = tcp_state::SYN_SENT;
287 static constexpr tcp_state SYN_RECEIVED = tcp_state::SYN_RECEIVED;
288 static constexpr tcp_state ESTABLISHED = tcp_state::ESTABLISHED;
289 static constexpr tcp_state FIN_WAIT_1 = tcp_state::FIN_WAIT_1;
290 static constexpr tcp_state FIN_WAIT_2 = tcp_state::FIN_WAIT_2;
291 static constexpr tcp_state CLOSE_WAIT = tcp_state::CLOSE_WAIT;
292 static constexpr tcp_state CLOSING = tcp_state::CLOSING;
293 static constexpr tcp_state LAST_ACK = tcp_state::LAST_ACK;
294 static constexpr tcp_state TIME_WAIT = tcp_state::TIME_WAIT;
295 tcp_state _state = CLOSED;
296 tcp& _tcp;
297 UserspaceEventManager &manager;
298 connection* _conn = nullptr;
299 bool _connect_done = false;
300 ipaddr _local_ip;
301 ipaddr _foreign_ip;
302 uint16_t _local_port;
303 uint16_t _foreign_port;
304 struct unacked_segment {
305 Packet p;
306 uint16_t data_len;
307 unsigned nr_transmits;
308 clock_type::time_point tx_time;
309 };
310 struct send {
311 tcp_sequence unacknowledged;
312 tcp_sequence next;
313 uint32_t window;
314 uint8_t window_scale;
315 uint16_t mss;
316 tcp_sequence urgent;
317 tcp_sequence wl1;
318 tcp_sequence wl2;
319 tcp_sequence initial;
320 std::deque<unacked_segment> data;
321 std::deque<Packet> unsent;
322 uint32_t unsent_len = 0;
323 uint32_t queued_len = 0;
324 bool closed = false;
325 // Wait for all data are acked
326 int _all_data_acked_fd = -1;
327 // Limit number of data queued into send queue
328 Throttle user_queue_space;
329 // Round-trip time variation
330 std::chrono::microseconds rttvar;
331 // Smoothed round-trip time
332 std::chrono::microseconds srtt;
333 bool first_rto_sample = true;
334 clock_type::time_point syn_tx_time;
335 // Congestion window
336 uint32_t cwnd;
337 // Slow start threshold
338 uint32_t ssthresh;
339 // Duplicated ACKs
340 uint16_t dupacks = 0;
341 unsigned syn_retransmit = 0;
342 unsigned fin_retransmit = 0;
343 uint32_t limited_transfer = 0;
344 uint32_t partial_ack = 0;
345 tcp_sequence recover;
346 bool window_probe = false;
347 send(CephContext *c): user_queue_space(c, "DPDK::tcp::tcb::user_queue_space", 81920) {}
348 } _snd;
349 struct receive {
350 tcp_sequence next;
351 uint32_t window;
352 uint8_t window_scale;
353 uint16_t mss;
354 tcp_sequence urgent;
355 tcp_sequence initial;
356 std::deque<Packet> data;
357 tcp_packet_merger out_of_order;
358 } _rcv;
359 EventCenter *center;
360 int fd;
361 // positive means no errno, 0 means eof, nagetive means error
362 int16_t _errno = 1;
363 tcp_option _option;
364 EventCallbackRef delayed_ack_event;
365 Tub<uint64_t> _delayed_ack_fd;
366 // Retransmission timeout
367 std::chrono::microseconds _rto{1000*1000};
368 std::chrono::microseconds _persist_time_out{1000*1000};
369 static constexpr std::chrono::microseconds _rto_min{1000*1000};
370 static constexpr std::chrono::microseconds _rto_max{60000*1000};
371 // Clock granularity
372 static constexpr std::chrono::microseconds _rto_clk_granularity{1000};
373 static constexpr uint16_t _max_nr_retransmit{5};
374 EventCallbackRef retransmit_event;
375 Tub<uint64_t> retransmit_fd;
376 EventCallbackRef persist_event;
377 EventCallbackRef all_data_ack_event;
378 Tub<uint64_t> persist_fd;
379 uint16_t _nr_full_seg_received = 0;
380 struct isn_secret {
381 // 512 bits secretkey for ISN generating
382 uint32_t key[16];
383 isn_secret () {
384 std::random_device rd;
385 std::default_random_engine e(rd());
386 std::uniform_int_distribution<uint32_t> dist{};
387 for (auto& k : key) {
388 k = dist(e);
389 }
390 }
391 };
392 static isn_secret _isn_secret;
393 tcp_sequence get_isn();
394 circular_buffer<typename InetTraits::l4packet> _packetq;
395 bool _poll_active = false;
396 public:
397 // callback
398 void close_final_cleanup();
399 ostream& _prefix(std::ostream *_dout);
400
401 public:
402 tcb(tcp& t, connid id);
403 ~tcb();
404 void input_handle_listen_state(tcp_hdr* th, Packet p);
405 void input_handle_syn_sent_state(tcp_hdr* th, Packet p);
406 void input_handle_other_state(tcp_hdr* th, Packet p);
407 void output_one(bool data_retransmit = false);
408 bool is_all_data_acked();
409 int send(Packet p);
410 void connect();
411 Tub<Packet> read();
412 void close();
413 void remove_from_tcbs() {
414 auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
415 _tcp._tcbs.erase(id);
416 }
417 Tub<typename InetTraits::l4packet> get_packet();
418 void output() {
419 if (!_poll_active) {
420 _poll_active = true;
421
422 auto tcb = this->shared_from_this();
423 _tcp._inet.wait_l2_dst_address(_foreign_ip, Packet(), [tcb] (const ethernet_address &dst, Packet p, int r) {
424 if (r == 0) {
425 tcb->_tcp.poll_tcb(dst, std::move(tcb));
426 } else if (r == -ETIMEDOUT) {
427 // in other states connection should time out
428 if (tcb->in_state(SYN_SENT)) {
429 tcb->_errno = -ETIMEDOUT;
430 tcb->cleanup();
431 }
432 } else if (r == -EBUSY) {
433 // retry later
434 tcb->_poll_active = false;
435 tcb->start_retransmit_timer();
436 }
437 });
438 }
439 }
440
441 int16_t get_errno() const {
442 return _errno;
443 }
444
445 tcp_state& state() {
446 return _state;
447 }
448
449 uint64_t peek_sent_available() {
450 if (!in_state(ESTABLISHED))
451 return 0;
452 uint64_t left = _snd.user_queue_space.get_max() - _snd.user_queue_space.get_current();
453 return left;
454 }
455
456 int is_connected() const {
457 if (_errno <= 0)
458 return _errno;
459 return _connect_done;
460 }
461
462 private:
463 void respond_with_reset(tcp_hdr* th);
464 bool merge_out_of_order();
465 void insert_out_of_order(tcp_sequence seq, Packet p);
466 void trim_receive_data_after_window();
467 bool should_send_ack(uint16_t seg_len);
468 void clear_delayed_ack();
469 Packet get_transmit_packet();
470 void retransmit_one() {
471 bool data_retransmit = true;
472 output_one(data_retransmit);
473 }
474 void start_retransmit_timer() {
475 if (retransmit_fd)
476 center->delete_time_event(*retransmit_fd);
477 retransmit_fd.construct(center->create_time_event(_rto.count(), retransmit_event));
478 };
479 void stop_retransmit_timer() {
480 if (retransmit_fd) {
481 center->delete_time_event(*retransmit_fd);
482 retransmit_fd.destroy();
483 }
484 };
485 void start_persist_timer() {
486 if (persist_fd)
487 center->delete_time_event(*persist_fd);
488 persist_fd.construct(center->create_time_event(_persist_time_out.count(), persist_event));
489 };
490 void stop_persist_timer() {
491 if (persist_fd) {
492 center->delete_time_event(*persist_fd);
493 persist_fd.destroy();
494 }
495 };
496 void persist();
497 void retransmit();
498 void fast_retransmit();
499 void update_rto(clock_type::time_point tx_time);
500 void update_cwnd(uint32_t acked_bytes);
501 void cleanup();
502 uint32_t can_send() {
503 if (_snd.window_probe) {
504 return 1;
505 }
506 // Can not send more than advertised window allows
507 auto x = std::min(uint32_t(_snd.unacknowledged + _snd.window - _snd.next), _snd.unsent_len);
508 // Can not send more than congestion window allows
509 x = std::min(_snd.cwnd, x);
510 if (_snd.dupacks == 1 || _snd.dupacks == 2) {
511 // RFC5681 Step 3.1
512 // Send cwnd + 2 * smss per RFC3042
513 auto flight = flight_size();
514 auto max = _snd.cwnd + 2 * _snd.mss;
515 x = flight <= max ? std::min(x, max - flight) : 0;
516 _snd.limited_transfer += x;
517 } else if (_snd.dupacks >= 3) {
518 // RFC5681 Step 3.5
519 // Sent 1 full-sized segment at most
520 x = std::min(uint32_t(_snd.mss), x);
521 }
522 return x;
523 }
524 uint32_t flight_size() {
525 uint32_t size = 0;
526 std::for_each(_snd.data.begin(), _snd.data.end(),
527 [&] (unacked_segment& seg) { size += seg.p.len(); });
528 return size;
529 }
530 uint16_t local_mss() {
531 return _tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
532 }
533 void queue_packet(Packet p) {
534 _packetq.emplace_back(
535 typename InetTraits::l4packet{_foreign_ip, std::move(p)});
536 }
537 void signal_data_received() {
538 manager.notify(fd, EVENT_READABLE);
539 }
540 void signal_all_data_acked() {
541 if (_snd._all_data_acked_fd >= 0 && _snd.unsent_len == 0 && _snd.queued_len == 0)
542 manager.notify(_snd._all_data_acked_fd, EVENT_READABLE);
543 }
544 void do_syn_sent() {
545 _state = SYN_SENT;
546 _snd.syn_tx_time = clock_type::now();
547 // Send <SYN> to remote
548 output();
549 }
550 void do_syn_received() {
551 _state = SYN_RECEIVED;
552 _snd.syn_tx_time = clock_type::now();
553 // Send <SYN,ACK> to remote
554 output();
555 }
556 void do_established() {
557 _state = ESTABLISHED;
558 update_rto(_snd.syn_tx_time);
559 _connect_done = true;
560 manager.notify(fd, EVENT_READABLE|EVENT_WRITABLE);
561 }
562 void do_reset() {
563 _state = CLOSED;
564 // Free packets to be sent which are waiting for user_queue_space
565 _snd.user_queue_space.reset();
566 cleanup();
567 _errno = -ECONNRESET;
568 manager.notify(fd, EVENT_READABLE);
569
570 if (_snd._all_data_acked_fd >= 0)
571 manager.notify(_snd._all_data_acked_fd, EVENT_READABLE);
572 }
573 void do_time_wait() {
574 // FIXME: Implement TIME_WAIT state timer
575 _state = TIME_WAIT;
576 cleanup();
577 }
578 void do_closed() {
579 _state = CLOSED;
580 cleanup();
581 }
582 void do_setup_isn() {
583 _snd.initial = get_isn();
584 _snd.unacknowledged = _snd.initial;
585 _snd.next = _snd.initial + 1;
586 _snd.recover = _snd.initial;
587 }
588 void do_local_fin_acked() {
589 _snd.unacknowledged += 1;
590 _snd.next += 1;
591 }
592 bool syn_needs_on() {
593 return in_state(SYN_SENT | SYN_RECEIVED);
594 }
595 bool fin_needs_on() {
596 return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed &&
597 _snd.unsent_len == 0 && _snd.queued_len == 0;
598 }
599 bool ack_needs_on() {
600 return !in_state(CLOSED | LISTEN | SYN_SENT);
601 }
602 bool foreign_will_not_send() {
603 return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED);
604 }
605 bool in_state(tcp_state state) {
606 return uint16_t(_state) & uint16_t(state);
607 }
608 void exit_fast_recovery() {
609 _snd.dupacks = 0;
610 _snd.limited_transfer = 0;
611 _snd.partial_ack = 0;
612 }
613 uint32_t data_segment_acked(tcp_sequence seg_ack);
614 bool segment_acceptable(tcp_sequence seg_seq, unsigned seg_len);
615 void init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end);
616 friend class connection;
617
618 friend class C_handle_delayed_ack;
619 friend class C_handle_retransmit;
620 friend class C_handle_persist;
621 friend class C_all_data_acked;
622 };
623
624 CephContext *cct;
625 // ipv4_l4<ip_protocol_num::tcp>
626 inet_type& _inet;
627 EventCenter *center;
628 UserspaceEventManager &manager;
629 std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs;
630 std::unordered_map<uint16_t, listener*> _listening;
631 std::random_device _rd;
632 std::default_random_engine _e;
633 std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
634 circular_buffer<std::pair<lw_shared_ptr<tcb>, ethernet_address>> _poll_tcbs;
635 // queue for packets that do not belong to any tcb
636 circular_buffer<ipv4_traits::l4packet> _packetq;
637 Throttle _queue_space;
638 // Limit number of data queued into send queue
639 public:
640 class connection {
641 lw_shared_ptr<tcb> _tcb;
642 public:
643 explicit connection(lw_shared_ptr<tcb> tcbp) : _tcb(std::move(tcbp)) { _tcb->_conn = this; }
644 connection(const connection&) = delete;
645 connection(connection&& x) noexcept : _tcb(std::move(x._tcb)) {
646 _tcb->_conn = this;
647 }
648 ~connection();
649 void operator=(const connection&) = delete;
650 connection& operator=(connection&& x) {
651 if (this != &x) {
652 this->~connection();
653 new (this) connection(std::move(x));
654 }
655 return *this;
656 }
657 int fd() const {
658 return _tcb->fd;
659 }
660 int send(Packet p) {
661 return _tcb->send(std::move(p));
662 }
663 Tub<Packet> read() {
664 return _tcb->read();
665 }
666 int16_t get_errno() const {
667 return _tcb->get_errno();
668 }
669 void close_read();
670 void close_write();
671 entity_addr_t remote_addr() const {
672 entity_addr_t addr;
673 auto net_ip = _tcb->_foreign_ip.hton();
674 memcpy((void*)&addr.in4_addr().sin_addr.s_addr,
675 &net_ip, sizeof(addr.in4_addr().sin_addr.s_addr));
676 addr.set_family(AF_INET);
677 return addr;
678 }
679 uint64_t peek_sent_available() {
680 return _tcb->peek_sent_available();
681 }
682 int is_connected() const { return _tcb->is_connected(); }
683 };
684 class listener {
685 tcp& _tcp;
686 uint16_t _port;
687 int _fd = -1;
688 int16_t _errno;
689 queue<connection> _q;
690 size_t _q_max_length;
691
692 private:
693 listener(tcp& t, uint16_t port, size_t queue_length)
694 : _tcp(t), _port(port), _errno(0), _q(), _q_max_length(queue_length) {
695 }
696 public:
697 listener(const listener&) = delete;
698 void operator=(const listener&) = delete;
699 listener(listener&& x)
700 : _tcp(x._tcp), _port(x._port), _fd(std::move(x._fd)), _errno(x._errno),
701 _q(std::move(x._q)) {
702 if (_fd >= 0)
703 _tcp._listening[_port] = this;
704 }
705 ~listener() {
706 abort_accept();
707 }
708 int listen() {
709 if (_tcp._listening.find(_port) != _tcp._listening.end())
710 return -EADDRINUSE;
711 _tcp._listening.emplace(_port, this);
712 _fd = _tcp.manager.get_eventfd();
713 return 0;
714 }
715 Tub<connection> accept() {
716 Tub<connection> c;
717 if (!_q.empty()) {
718 c = std::move(_q.front());
719 _q.pop();
720 }
721 return c;
722 }
723 void abort_accept() {
724 while (!_q.empty())
725 _q.pop();
726 if (_fd >= 0) {
727 _tcp._listening.erase(_port);
728 _tcp.manager.close(_fd);
729 _fd = -1;
730 }
731 }
732 int16_t get_errno() const {
733 return _errno;
734 }
735 bool full() const {
736 return _q.size() == _q_max_length;
737 }
738 int fd() const {
739 return _fd;
740 }
741 friend class tcp;
742 };
743 public:
744 explicit tcp(CephContext *c, inet_type& inet, EventCenter *cen);
745 void received(Packet p, ipaddr from, ipaddr to);
746 bool forward(forward_hash& out_hash_data, Packet& p, size_t off);
747 listener listen(uint16_t port, size_t queue_length = 100);
748 connection connect(const entity_addr_t &addr);
749 const hw_features& get_hw_features() const { return _inet._inet.get_hw_features(); }
750 void poll_tcb(const ethernet_address &dst, lw_shared_ptr<tcb> tcb) {
751 _poll_tcbs.emplace_back(std::move(tcb), dst);
752 }
753 bool push_listen_queue(uint16_t port, tcb *t) {
754 auto listener = _listening.find(port);
755 if (listener == _listening.end() || listener->second->full()) {
756 return false;
757 }
758 listener->second->_q.push(connection(t->shared_from_this()));
759 manager.notify(listener->second->_fd, EVENT_READABLE);
760 return true;
761 }
762
763 private:
764 void send_packet_without_tcb(ipaddr from, ipaddr to, Packet p);
765 void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
766 friend class listener;
767};
768
769template <typename InetTraits>
770tcp<InetTraits>::tcp(CephContext *c, inet_type& inet, EventCenter *cen)
771 : cct(c), _inet(inet), center(cen),
772 manager(static_cast<DPDKDriver*>(cen->get_driver())->manager),
773 _e(_rd()), _queue_space(cct, "DPDK::tcp::queue_space", 81920) {
774 int tcb_polled = 0u;
775 _inet.register_packet_provider([this, tcb_polled] () mutable {
776 Tub<typename InetTraits::l4packet> l4p;
777 auto c = _poll_tcbs.size();
778 if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
779 l4p = std::move(_packetq.front());
780 _packetq.pop_front();
781 _queue_space.put(l4p->p.len());
782 } else {
783 while (c--) {
784 tcb_polled++;
785 lw_shared_ptr<tcb> tcb;
786 ethernet_address dst;
787 std::tie(tcb, dst) = std::move(_poll_tcbs.front());
788 _poll_tcbs.pop_front();
789 l4p = std::move(tcb->get_packet());
790 if (l4p) {
791 l4p->e_dst = dst;
792 break;
793 }
794 }
795 }
796 return l4p;
797 });
798}
799
800template <typename InetTraits>
801auto tcp<InetTraits>::listen(uint16_t port, size_t queue_length) -> listener {
802 return listener(*this, port, queue_length);
803}
804
805template <typename InetTraits>
806typename tcp<InetTraits>::connection tcp<InetTraits>::connect(const entity_addr_t &addr) {
807 uint16_t src_port;
808 connid id;
809 auto src_ip = _inet._inet.host_address();
810 auto dst_ip = ipv4_address(addr);
811 auto dst_port = addr.get_port();
812
813 do {
814 src_port = _port_dist(_e);
815 id = connid{src_ip, dst_ip, src_port, (uint16_t)dst_port};
816 if (_tcbs.find(id) == _tcbs.end()) {
817 if (_inet._inet.netif()->hw_queues_count() == 1 ||
818 _inet._inet.netif()->hash2cpu(
819 id.hash(_inet._inet.netif()->rss_key())) == center->get_id())
820 break;
821 }
822 } while (true);
823
824 auto tcbp = make_lw_shared<tcb>(*this, id);
825 _tcbs.insert({id, tcbp});
826 tcbp->connect();
827 return connection(tcbp);
828}
829
830template <typename InetTraits>
831bool tcp<InetTraits>::forward(forward_hash& out_hash_data, Packet& p, size_t off) {
832 auto th = p.get_header<tcp_hdr>(off);
833 if (th) {
834 out_hash_data.push_back(th->src_port);
835 out_hash_data.push_back(th->dst_port);
836 }
837 return true;
838}
839
840template <typename InetTraits>
841void tcp<InetTraits>::received(Packet p, ipaddr from, ipaddr to) {
842 auto th = p.get_header<tcp_hdr>(0);
843 if (!th) {
844 return;
845 }
846 // th->data_offset is correct even before ntoh()
847 if (unsigned(th->data_offset * 4) < sizeof(*th)) {
848 return;
849 }
850
851 if (!get_hw_features().rx_csum_offload) {
852 checksummer csum;
853 InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
854 csum.sum(p);
855 if (csum.get() != 0) {
856 return;
857 }
858 }
859 auto h = th->ntoh();
860 auto id = connid{to, from, h.dst_port, h.src_port};
861 auto tcbi = _tcbs.find(id);
862 lw_shared_ptr<tcb> tcbp;
863 if (tcbi == _tcbs.end()) {
864 auto listener = _listening.find(id.local_port);
865 if (listener == _listening.end() || listener->second->full()) {
866 // 1) In CLOSE state
867 // 1.1 all data in the incoming segment is discarded. An incoming
868 // segment containing a RST is discarded. An incoming segment not
869 // containing a RST causes a RST to be sent in response.
870 // FIXME:
871 // if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK>
872 // if ACK on: <SEQ=SEG.ACK><CTL=RST>
873 return respond_with_reset(&h, id.local_ip, id.foreign_ip);
874 } else {
875 // 2) In LISTEN state
876 // 2.1 first check for an RST
877 if (h.f_rst) {
878 // An incoming RST should be ignored
879 return;
880 }
881 // 2.2 second check for an ACK
882 if (h.f_ack) {
883 // Any acknowledgment is bad if it arrives on a connection
884 // still in the LISTEN state.
885 // <SEQ=SEG.ACK><CTL=RST>
886 return respond_with_reset(&h, id.local_ip, id.foreign_ip);
887 }
888 // 2.3 third check for a SYN
889 if (h.f_syn) {
890 // check the security
891 // NOTE: Ignored for now
892 tcbp = make_lw_shared<tcb>(*this, id);
893 _tcbs.insert({id, tcbp});
894 return tcbp->input_handle_listen_state(&h, std::move(p));
895 }
896 // 2.4 fourth other text or control
897 // So you are unlikely to get here, but if you do, drop the
898 // segment, and return.
899 return;
900 }
901 } else {
902 tcbp = tcbi->second;
903 if (tcbp->state() == tcp_state::SYN_SENT) {
904 // 3) In SYN_SENT State
905 return tcbp->input_handle_syn_sent_state(&h, std::move(p));
906 } else {
907 // 4) In other state, can be one of the following:
908 // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
909 // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
910 return tcbp->input_handle_other_state(&h, std::move(p));
911 }
912 }
913}
914
915// Send packet does not belong to any tcb
916template <typename InetTraits>
917void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, Packet p) {
918 if (_queue_space.get_or_fail(p.len())) { // drop packets that do not fit the queue
919 _inet.wait_l2_dst_address(to, std::move(p), [this, to] (const ethernet_address &e_dst, Packet p, int r) mutable {
920 if (r == 0)
921 _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
922 });
923 }
924}
925
926template <typename InetTraits>
927tcp<InetTraits>::connection::~connection() {
928 if (_tcb) {
929 _tcb->_conn = nullptr;
930 close_read();
931 close_write();
932 }
933}
934
935template <typename InetTraits>
936tcp<InetTraits>::tcb::tcb(tcp& t, connid id)
937 : _tcp(t), manager(t.manager), _local_ip(id.local_ip) , _foreign_ip(id.foreign_ip),
938 _local_port(id.local_port), _foreign_port(id.foreign_port),
939 _snd(_tcp.cct),
940 center(t.center),
941 fd(t.manager.get_eventfd()),
942 delayed_ack_event(new tcp<InetTraits>::C_handle_delayed_ack(this)),
943 retransmit_event(new tcp<InetTraits>::C_handle_retransmit(this)),
944 persist_event(new tcp<InetTraits>::C_handle_persist(this)),
945 all_data_ack_event(new tcp<InetTraits>::C_all_data_acked(this)) {}
946
947template <typename InetTraits>
948tcp<InetTraits>::tcb::~tcb()
949{
950 if (_delayed_ack_fd)
951 center->delete_time_event(*_delayed_ack_fd);
952 if (retransmit_fd)
953 center->delete_time_event(*retransmit_fd);
954 if (persist_fd)
955 center->delete_time_event(*persist_fd);
956 delete delayed_ack_event;
957 delete retransmit_event;
958 delete persist_event;
959 delete all_data_ack_event;
960 manager.close(fd);
961 fd = -1;
962}
963
964template <typename InetTraits>
965void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth)
966{
967 _tcp.respond_with_reset(rth, _local_ip, _foreign_ip);
968}
969
970template <typename InetTraits>
971uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_sequence seg_ack) {
972 uint32_t total_acked_bytes = 0;
973 // Full ACK of segment
974 while (!_snd.data.empty()
975 && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) {
976 auto acked_bytes = _snd.data.front().p.len();
977 _snd.unacknowledged += acked_bytes;
978 // Ignore retransmitted segments when setting the RTO
979 if (_snd.data.front().nr_transmits == 0) {
980 update_rto(_snd.data.front().tx_time);
981 }
982 update_cwnd(acked_bytes);
983 total_acked_bytes += acked_bytes;
984 _snd.user_queue_space.put(_snd.data.front().data_len);
985 manager.notify(fd, EVENT_WRITABLE);
986 _snd.data.pop_front();
987 }
988 // Partial ACK of segment
989 if (_snd.unacknowledged < seg_ack) {
990 auto acked_bytes = seg_ack - _snd.unacknowledged;
991 if (!_snd.data.empty()) {
992 auto& unacked_seg = _snd.data.front();
993 unacked_seg.p.trim_front(acked_bytes);
994 }
995 _snd.unacknowledged = seg_ack;
996 update_cwnd(acked_bytes);
997 total_acked_bytes += acked_bytes;
998 }
999 return total_acked_bytes;
1000}
1001
1002template <typename InetTraits>
1003bool tcp<InetTraits>::tcb::segment_acceptable(tcp_sequence seg_seq, unsigned seg_len) {
1004 if (seg_len == 0 && _rcv.window == 0) {
1005 // SEG.SEQ = RCV.NXT
1006 return seg_seq == _rcv.next;
1007 } else if (seg_len == 0 && _rcv.window > 0) {
1008 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1009 return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window);
1010 } else if (seg_len > 0 && _rcv.window > 0) {
1011 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1012 // or
1013 // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
1014 bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window);
1015 bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window);
1016 return x || y;
1017 } else {
1018 // SEG.LEN > 0 RCV.WND = 0, not acceptable
1019 return false;
1020 }
1021}
1022
1023template <typename InetTraits>
1024void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) {
1025 // Handle tcp options
1026 _option.parse(opt_start, opt_end);
1027
1028 // Remote receive window scale factor
1029 _snd.window_scale = _option._remote_win_scale;
1030 // Local receive window scale factor
1031 _rcv.window_scale = _option._local_win_scale;
1032
1033 // Maximum segment size remote can receive
1034 _snd.mss = _option._remote_mss;
1035 // Maximum segment size local can receive
1036 _rcv.mss = _option._local_mss = local_mss();
1037
1038 // Linux's default window size
1039 _rcv.window = 29200 << _rcv.window_scale;
1040 _snd.window = th->window << _snd.window_scale;
1041
1042 // Segment sequence number used for last window update
1043 _snd.wl1 = th->seq;
1044 // Segment acknowledgment number used for last window update
1045 _snd.wl2 = th->ack;
1046
1047 // Setup initial congestion window
1048 if (2190 < _snd.mss) {
1049 _snd.cwnd = 2 * _snd.mss;
1050 } else if (1095 < _snd.mss && _snd.mss <= 2190) {
1051 _snd.cwnd = 3 * _snd.mss;
1052 } else {
1053 _snd.cwnd = 4 * _snd.mss;
1054 }
1055
1056 // Setup initial slow start threshold
1057 _snd.ssthresh = th->window << _snd.window_scale;
1058}
1059
1060template <typename InetTraits>
1061Packet tcp<InetTraits>::tcb::get_transmit_packet() {
1062 // easy case: empty queue
1063 if (_snd.unsent.empty()) {
1064 return Packet();
1065 }
1066 auto can_send = this->can_send();
1067 // Max number of TCP payloads we can pass to NIC
1068 uint32_t len;
1069 if (_tcp.get_hw_features().tx_tso) {
1070 // FIXME: Info tap device the size of the splitted packet
1071 len = _tcp.get_hw_features().max_packet_len - tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
1072 } else {
1073 len = std::min(uint16_t(_tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
1074 }
1075 can_send = std::min(can_send, len);
1076 // easy case: one small packet
1077 if (_snd.unsent.front().len() <= can_send) {
1078 auto p = std::move(_snd.unsent.front());
1079 _snd.unsent.pop_front();
1080 _snd.unsent_len -= p.len();
1081 return p;
1082 }
1083 // moderate case: need to split one packet
1084 if (_snd.unsent.front().len() > can_send) {
1085 auto p = _snd.unsent.front().share(0, can_send);
1086 _snd.unsent.front().trim_front(can_send);
1087 _snd.unsent_len -= p.len();
1088 return p;
1089 }
1090 // hard case: merge some packets, possibly split last
1091 auto p = std::move(_snd.unsent.front());
1092 _snd.unsent.pop_front();
1093 can_send -= p.len();
1094 while (!_snd.unsent.empty()
1095 && _snd.unsent.front().len() <= can_send) {
1096 can_send -= _snd.unsent.front().len();
1097 p.append(std::move(_snd.unsent.front()));
1098 _snd.unsent.pop_front();
1099 }
1100 // FIXME: this will result in calling "deleter" of packet which free managed objects
1101 // will used later
1102 // if (!_snd.unsent.empty() && can_send) {
1103 // auto& q = _snd.unsent.front();
1104 // p.append(q.share(0, can_send));
1105 // q.trim_front(can_send);
1106 // }
1107 _snd.unsent_len -= p.len();
1108 return p;
1109}
1110
1111template <typename InetTraits>
1112void tcp<InetTraits>::tcb::output_one(bool data_retransmit) {
1113 if (in_state(CLOSED)) {
1114 return;
1115 }
1116
1117 Packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet();
1118 Packet clone = p.share(); // early clone to prevent share() from calling packet::unuse_internal_data() on header.
1119 uint16_t len = p.len();
1120 bool syn_on = syn_needs_on();
1121 bool ack_on = ack_needs_on();
1122
1123 auto options_size = _option.get_size(syn_on, ack_on);
1124 auto th = p.prepend_header<tcp_hdr>(options_size);
1125
1126 th->src_port = _local_port;
1127 th->dst_port = _foreign_port;
1128
1129 th->f_syn = syn_on;
1130 th->f_ack = ack_on;
1131 if (ack_on) {
1132 clear_delayed_ack();
1133 }
1134 th->f_urg = false;
1135 th->f_psh = false;
1136
1137 tcp_sequence seq;
1138 if (data_retransmit) {
1139 seq = _snd.unacknowledged;
1140 } else {
1141 seq = syn_on ? _snd.initial : _snd.next;
1142 _snd.next += len;
1143 }
1144 th->seq = seq;
1145 th->ack = _rcv.next;
1146 th->data_offset = (sizeof(*th) + options_size) / 4;
1147 th->window = _rcv.window >> _rcv.window_scale;
1148 th->checksum = 0;
1149
1150 // FIXME: does the FIN have to fit in the window?
1151 bool fin_on = fin_needs_on();
1152 th->f_fin = fin_on;
1153
1154 // Add tcp options
1155 _option.fill(th, options_size);
1156 *th = th->hton();
1157
1158 offload_info oi;
1159 checksummer csum;
1160 uint16_t pseudo_hdr_seg_len = 0;
1161
1162 oi.tcp_hdr_len = sizeof(tcp_hdr) + options_size;
1163
1164 if (_tcp.get_hw_features().tx_csum_l4_offload) {
1165 oi.needs_csum = true;
1166
1167 //
1168 // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's
1169 // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones'
1170 // complement sum of the pseudo header.
1171 //
1172 // For TSO the csum should be calculated for a pseudo header with
1173 // segment length set to 0. All the rest is the same as for a TCP Tx
1174 // CSUM offload case.
1175 //
1176 if (_tcp.get_hw_features().tx_tso && len > _snd.mss) {
1177 oi.tso_seg_size = _snd.mss;
1178 } else {
1179 pseudo_hdr_seg_len = sizeof(*th) + options_size + len;
1180 }
1181 } else {
1182 pseudo_hdr_seg_len = sizeof(*th) + options_size + len;
1183 oi.needs_csum = false;
1184 }
1185
1186 InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip,
1187 pseudo_hdr_seg_len);
1188
1189 if (_tcp.get_hw_features().tx_csum_l4_offload) {
1190 th->checksum = ~csum.get();
1191 } else {
1192 csum.sum(p);
1193 th->checksum = csum.get();
1194 }
1195
1196 oi.protocol = ip_protocol_num::tcp;
1197
1198 p.set_offload_info(oi);
1199
1200 if (!data_retransmit && (len || syn_on || fin_on)) {
1201 auto now = clock_type::now();
1202 if (len) {
1203 unsigned nr_transmits = 0;
1204 _snd.data.emplace_back(unacked_segment{std::move(clone),
1205 len, nr_transmits, now});
1206 }
1207 if (!retransmit_fd) {
1208 start_retransmit_timer();
1209 }
1210 }
1211
1212 queue_packet(std::move(p));
1213}
1214
1215template <typename InetTraits>
1216bool tcp<InetTraits>::tcb::is_all_data_acked() {
1217 if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) {
1218 return true;
1219 }
1220 return false;
1221}
1222
1223template <typename InetTraits>
1224Tub<Packet> tcp<InetTraits>::tcb::read() {
1225 Tub<Packet> p;
1226 if (_rcv.data.empty())
1227 return p;
1228
1229 p.construct();
1230 for (auto&& q : _rcv.data) {
1231 p->append(std::move(q));
1232 }
1233 _rcv.data.clear();
1234 return p;
1235}
1236
1237template <typename InetTraits>
1238int tcp<InetTraits>::tcb::send(Packet p) {
1239 // We can not send after the connection is closed
1240 assert(!_snd.closed);
1241
1242 if (in_state(CLOSED))
1243 return -ECONNRESET;
1244
1245 auto len = p.len();
1246 if (!_snd.user_queue_space.get_or_fail(len)) {
1247 // note: caller must ensure enough queue space to send
1248 ceph_abort();
1249 }
1250 // TODO: Handle p.len() > max user_queue_space case
1251 _snd.queued_len += len;
1252 _snd.unsent_len += len;
1253 _snd.queued_len -= len;
1254 _snd.unsent.push_back(std::move(p));
1255 if (can_send() > 0) {
1256 output();
1257 }
1258 return len;
1259}
1260
1261template <typename InetTraits>
1262void tcp<InetTraits>::tcb::close() {
1263 if (in_state(CLOSED) || _snd.closed) {
1264 return ;
1265 }
1266 // TODO: We should make this asynchronous
1267
1268 _errno = -EPIPE;
1269 center->delete_file_event(fd, EVENT_READABLE|EVENT_WRITABLE);
1270 bool acked = is_all_data_acked();
1271 if (!acked) {
1272 _snd._all_data_acked_fd = manager.get_eventfd();
1273 center->create_file_event(_snd._all_data_acked_fd, EVENT_READABLE, all_data_ack_event);
1274 } else {
1275 close_final_cleanup();
1276 }
1277}
1278
1279template <typename InetTraits>
1280bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) {
1281 // We've received a TSO packet, do ack immediately
1282 if (seg_len > _rcv.mss) {
1283 _nr_full_seg_received = 0;
1284 if (_delayed_ack_fd) {
1285 center->delete_time_event(*_delayed_ack_fd);
1286 _delayed_ack_fd.destroy();
1287 }
1288 return true;
1289 }
1290
1291 // We've received a full sized segment, ack for every second full sized segment
1292 if (seg_len == _rcv.mss) {
1293 if (_nr_full_seg_received++ >= 1) {
1294 _nr_full_seg_received = 0;
1295 if (_delayed_ack_fd) {
1296 center->delete_time_event(*_delayed_ack_fd);
1297 _delayed_ack_fd.destroy();
1298 }
1299 return true;
1300 }
1301 }
1302
1303 // If the timer is armed and its callback hasn't been run.
1304 if (_delayed_ack_fd) {
1305 return false;
1306 }
1307
1308 // If the timer is not armed, schedule a delayed ACK.
1309 // The maximum delayed ack timer allowed by RFC1122 is 500ms, most
1310 // implementations use 200ms.
1311 _delayed_ack_fd.construct(center->create_time_event(200*1000, delayed_ack_event));
1312 return false;
1313}
1314
1315template <typename InetTraits>
1316void tcp<InetTraits>::tcb::clear_delayed_ack() {
1317 if (_delayed_ack_fd) {
1318 center->delete_time_event(*_delayed_ack_fd);
1319 _delayed_ack_fd.destroy();
1320 }
1321}
1322
1323template <typename InetTraits>
1324bool tcp<InetTraits>::tcb::merge_out_of_order() {
1325 bool merged = false;
1326 if (_rcv.out_of_order.map.empty()) {
1327 return merged;
1328 }
1329 for (auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) {
1330 auto& p = it->second;
1331 auto seg_beg = it->first;
1332 auto seg_len = p.len();
1333 auto seg_end = seg_beg + seg_len;
1334 if (seg_beg <= _rcv.next && seg_end > _rcv.next) {
1335 // This segment has been received out of order and its previous
1336 // segment has been received now
1337 auto trim = _rcv.next - seg_beg;
1338 if (trim) {
1339 p.trim_front(trim);
1340 seg_len -= trim;
1341 }
1342 _rcv.next += seg_len;
1343 _rcv.data.push_back(std::move(p));
1344 // Since c++11, erase() always returns the value of the following element
1345 it = _rcv.out_of_order.map.erase(it);
1346 merged = true;
1347 } else if (_rcv.next >= seg_end) {
1348 // This segment has been receive already, drop it
1349 it = _rcv.out_of_order.map.erase(it);
1350 } else {
1351 // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only,
1352 // so we can stop looking here.
1353 it++;
1354 break;
1355 }
1356 }
1357 return merged;
1358}
1359
1360template <typename InetTraits>
1361void tcp<InetTraits>::tcb::insert_out_of_order(tcp_sequence seg, Packet p) {
1362 _rcv.out_of_order.merge(seg, std::move(p));
1363}
1364
1365template <typename InetTraits>
1366void tcp<InetTraits>::tcb::trim_receive_data_after_window() {
1367 abort();
1368}
1369
1370template <typename InetTraits>
1371void tcp<InetTraits>::tcb::fast_retransmit() {
1372 if (!_snd.data.empty()) {
1373 auto& unacked_seg = _snd.data.front();
1374 unacked_seg.nr_transmits++;
1375 retransmit_one();
1376 output();
1377 }
1378}
1379
1380template <typename InetTraits>
1381void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) {
1382 // Update RTO according to RFC6298
1383 auto R = std::chrono::duration_cast<std::chrono::microseconds>(clock_type::now() - tx_time);
1384 if (_snd.first_rto_sample) {
1385 _snd.first_rto_sample = false;
1386 // RTTVAR <- R/2
1387 // SRTT <- R
1388 _snd.rttvar = R / 2;
1389 _snd.srtt = R;
1390 } else {
1391 // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
1392 // SRTT <- (1 - alpha) * SRTT + alpha * R'
1393 // where alpha = 1/8 and beta = 1/4
1394 auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt);
1395 _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4;
1396 _snd.srtt = _snd.srtt * 7 / 8 + R / 8;
1397 }
1398 // RTO <- SRTT + max(G, K * RTTVAR)
1399 _rto = _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar);
1400
1401 // Make sure 1 sec << _rto << 60 sec
1402 _rto = std::max(_rto, _rto_min);
1403 _rto = std::min(_rto, _rto_max);
1404}
1405
1406template <typename InetTraits>
1407void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) {
1408 uint32_t smss = _snd.mss;
1409 if (_snd.cwnd < _snd.ssthresh) {
1410 // In slow start phase
1411 _snd.cwnd += std::min(acked_bytes, smss);
1412 } else {
1413 // In congestion avoidance phase
1414 uint32_t round_up = 1;
1415 _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd);
1416 }
1417}
1418
1419
1420template <typename InetTraits>
1421void tcp<InetTraits>::tcb::cleanup() {
1422 manager.notify(fd, EVENT_READABLE);
1423 _snd.closed = true;
1424 _snd.unsent.clear();
1425 _snd.data.clear();
1426 _rcv.out_of_order.map.clear();
1427 _rcv.data.clear();
1428 stop_retransmit_timer();
1429 clear_delayed_ack();
1430 center->dispatch_event_external(new tcp<InetTraits>::C_actual_remove_tcb(this));
1431 remove_from_tcbs();
1432}
1433
1434template <typename InetTraits>
1435tcp_sequence tcp<InetTraits>::tcb::get_isn() {
1436 // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers
1437 // with the expression:
1438 // ISN = M + F(localip, localport, remoteip, remoteport, secretkey)
1439 // M is the 4 microsecond timer
1440 using namespace std::chrono;
1441 uint32_t hash[4];
1442 hash[0] = _local_ip.ip;
1443 hash[1] = _foreign_ip.ip;
1444 hash[2] = (_local_port << 16) + _foreign_port;
1445 hash[3] = _isn_secret.key[15];
1446 CryptoPP::Weak::MD5::Transform(hash, _isn_secret.key);
1447 auto seq = hash[0];
1448 auto m = duration_cast<microseconds>(clock_type::now().time_since_epoch());
1449 seq += m.count() / 4;
1450 return make_seq(seq);
1451}
1452
1453template <typename InetTraits>
1454Tub<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
1455 _poll_active = false;
1456 if (_packetq.empty()) {
1457 output_one();
1458 }
1459
1460 Tub<typename InetTraits::l4packet> p;
1461 if (in_state(CLOSED)) {
1462 return p;
1463 }
1464
1465 assert(!_packetq.empty());
1466
1467 p = std::move(_packetq.front());
1468 _packetq.pop_front();
1469 if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0)) {
1470 // If there are packets to send in the queue or tcb is allowed to send
1471 // more add tcp back to polling set to keep sending. In addition, dupacks >= 3
1472 // is an indication that an segment is lost, stop sending more in this case.
1473 output();
1474 }
1475 return p;
1476}
1477
1478template <typename InetTraits>
1479void tcp<InetTraits>::connection::close_read() {
1480 // do nothing
1481 // _tcb->manager.notify(_tcb->fd, EVENT_READABLE);
1482}
1483
1484template <typename InetTraits>
1485void tcp<InetTraits>::connection::close_write() {
1486 _tcb->close();
1487}
1488
1489template <typename InetTraits>
1490constexpr uint16_t tcp<InetTraits>::tcb::_max_nr_retransmit;
1491
1492template <typename InetTraits>
1493constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_min;
1494
1495template <typename InetTraits>
1496constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_max;
1497
1498template <typename InetTraits>
1499constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_clk_granularity;
1500
1501template <typename InetTraits>
1502typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret;
1503
1504
1505#endif /* TCP_HH_ */