]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/net/tcp.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / net / tcp.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/shared_ptr.hh>
25#include <seastar/core/queue.hh>
26#include <seastar/core/semaphore.hh>
11fdf7f2
TL
27#include <seastar/core/byteorder.hh>
28#include <seastar/core/metrics.hh>
29#include <seastar/net/net.hh>
30#include <seastar/net/ip_checksum.hh>
31#include <seastar/net/ip.hh>
32#include <seastar/net/const.hh>
33#include <seastar/net/packet-util.hh>
34#include <seastar/util/std-compat.hh>
35#include <unordered_map>
36#include <map>
37#include <functional>
38#include <deque>
39#include <chrono>
40#include <random>
41#include <stdexcept>
42#include <system_error>
43
44#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
45#include <cryptopp/md5.h>
46
47namespace seastar {
48
49using namespace std::chrono_literals;
50
51namespace net {
52
9f95a23c 53struct tcp_hdr;
11fdf7f2
TL
54
55inline auto tcp_error(int err) {
56 return std::system_error(err, std::system_category());
57}
58
59inline auto tcp_reset_error() {
60 return tcp_error(ECONNRESET);
61};
62
63inline auto tcp_connect_error() {
64 return tcp_error(ECONNABORTED);
65}
66
67inline auto tcp_refused_error() {
68 return tcp_error(ECONNREFUSED);
69};
70
71enum class tcp_state : uint16_t {
72 CLOSED = (1 << 0),
73 LISTEN = (1 << 1),
74 SYN_SENT = (1 << 2),
75 SYN_RECEIVED = (1 << 3),
76 ESTABLISHED = (1 << 4),
77 FIN_WAIT_1 = (1 << 5),
78 FIN_WAIT_2 = (1 << 6),
79 CLOSE_WAIT = (1 << 7),
80 CLOSING = (1 << 8),
81 LAST_ACK = (1 << 9),
82 TIME_WAIT = (1 << 10)
83};
84
85inline tcp_state operator|(tcp_state s1, tcp_state s2) {
86 return tcp_state(uint16_t(s1) | uint16_t(s2));
87}
88
89template <typename... Args>
90void tcp_debug(const char* fmt, Args&&... args) {
91#if TCP_DEBUG
92 print(fmt, std::forward<Args>(args)...);
93#endif
94}
95
96struct tcp_option {
97 // The kind and len field are fixed and defined in TCP protocol
98 enum class option_kind: uint8_t { mss = 2, win_scale = 3, sack = 4, timestamps = 8, nop = 1, eol = 0 };
99 enum class option_len: uint8_t { mss = 4, win_scale = 3, sack = 2, timestamps = 10, nop = 1, eol = 1 };
100 static void write(char* p, option_kind kind, option_len len) {
101 p[0] = static_cast<uint8_t>(kind);
102 if (static_cast<uint8_t>(len) > 1) {
103 p[1] = static_cast<uint8_t>(len);
104 }
105 }
106 struct mss {
107 static constexpr option_kind kind = option_kind::mss;
108 static constexpr option_len len = option_len::mss;
109 uint16_t mss;
110 static tcp_option::mss read(const char* p) {
111 tcp_option::mss x;
112 x.mss = read_be<uint16_t>(p + 2);
113 return x;
114 }
115 void write(char* p) const {
116 tcp_option::write(p, kind, len);
117 write_be<uint16_t>(p + 2, mss);
118 }
119 };
120 struct win_scale {
121 static constexpr option_kind kind = option_kind::win_scale;
122 static constexpr option_len len = option_len::win_scale;
123 uint8_t shift;
124 static tcp_option::win_scale read(const char* p) {
125 tcp_option::win_scale x;
126 x.shift = p[2];
127 return x;
128 }
129 void write(char* p) const {
130 tcp_option::write(p, kind, len);
131 p[2] = shift;
132 }
133 };
134 struct sack {
135 static constexpr option_kind kind = option_kind::sack;
136 static constexpr option_len len = option_len::sack;
137 static tcp_option::sack read(const char* p) {
138 return {};
139 }
140 void write(char* p) const {
141 tcp_option::write(p, kind, len);
142 }
143 };
144 struct timestamps {
145 static constexpr option_kind kind = option_kind::timestamps;
146 static constexpr option_len len = option_len::timestamps;
147 uint32_t t1;
148 uint32_t t2;
149 static tcp_option::timestamps read(const char* p) {
150 tcp_option::timestamps ts;
151 ts.t1 = read_be<uint32_t>(p + 2);
152 ts.t2 = read_be<uint32_t>(p + 6);
153 return ts;
154 }
155 void write(char* p) const {
156 tcp_option::write(p, kind, len);
157 write_be<uint32_t>(p + 2, t1);
158 write_be<uint32_t>(p + 6, t2);
159 }
160 };
161 struct nop {
162 static constexpr option_kind kind = option_kind::nop;
163 static constexpr option_len len = option_len::nop;
164 void write(char* p) const {
165 tcp_option::write(p, kind, len);
166 }
167 };
168 struct eol {
169 static constexpr option_kind kind = option_kind::eol;
170 static constexpr option_len len = option_len::eol;
171 void write(char* p) const {
172 tcp_option::write(p, kind, len);
173 }
174 };
175 static const uint8_t align = 4;
176
177 void parse(uint8_t* beg, uint8_t* end);
178 uint8_t fill(void* h, const tcp_hdr* th, uint8_t option_size);
179 uint8_t get_size(bool syn_on, bool ack_on);
180
181 // For option negotiattion
182 bool _mss_received = false;
183 bool _win_scale_received = false;
184 bool _timestamps_received = false;
185 bool _sack_received = false;
186
187 // Option data
188 uint16_t _remote_mss = 536;
189 uint16_t _local_mss;
190 uint8_t _remote_win_scale = 0;
191 uint8_t _local_win_scale = 0;
192};
193inline char*& operator+=(char*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
194inline const char*& operator+=(const char*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
195inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
196
197struct tcp_seq {
198 uint32_t raw;
199};
200
201inline tcp_seq ntoh(tcp_seq s) {
202 return tcp_seq { ntoh(s.raw) };
203}
204
205inline tcp_seq hton(tcp_seq s) {
206 return tcp_seq { hton(s.raw) };
207}
208
209inline
210std::ostream& operator<<(std::ostream& os, tcp_seq s) {
211 return os << s.raw;
212}
213
214inline tcp_seq make_seq(uint32_t raw) { return tcp_seq{raw}; }
215inline tcp_seq& operator+=(tcp_seq& s, int32_t n) { s.raw += n; return s; }
216inline tcp_seq& operator-=(tcp_seq& s, int32_t n) { s.raw -= n; return s; }
217inline tcp_seq operator+(tcp_seq s, int32_t n) { return s += n; }
218inline tcp_seq operator-(tcp_seq s, int32_t n) { return s -= n; }
219inline int32_t operator-(tcp_seq s, tcp_seq q) { return s.raw - q.raw; }
220inline bool operator==(tcp_seq s, tcp_seq q) { return s.raw == q.raw; }
221inline bool operator!=(tcp_seq s, tcp_seq q) { return !(s == q); }
222inline bool operator<(tcp_seq s, tcp_seq q) { return s - q < 0; }
223inline bool operator>(tcp_seq s, tcp_seq q) { return q < s; }
224inline bool operator<=(tcp_seq s, tcp_seq q) { return !(s > q); }
225inline bool operator>=(tcp_seq s, tcp_seq q) { return !(s < q); }
226
227struct tcp_hdr {
228 static constexpr size_t len = 20;
229 uint16_t src_port;
230 uint16_t dst_port;
231 tcp_seq seq;
232 tcp_seq ack;
233 uint8_t rsvd1 : 4;
234 uint8_t data_offset : 4;
235 uint8_t f_fin : 1;
236 uint8_t f_syn : 1;
237 uint8_t f_rst : 1;
238 uint8_t f_psh : 1;
239 uint8_t f_ack : 1;
240 uint8_t f_urg : 1;
241 uint8_t rsvd2 : 2;
242 uint16_t window;
243 uint16_t checksum;
244 uint16_t urgent;
245 static tcp_hdr read(const char* p) {
246 tcp_hdr h;
247 h.src_port = read_be<uint16_t>(p + 0);
248 h.dst_port = read_be<uint16_t>(p + 2);
249 h.seq = tcp_seq{read_be<uint32_t>(p + 4)};
250 h.ack = tcp_seq{read_be<uint32_t>(p + 8)};
251 h.rsvd1 = p[12] & 15;
252 h.data_offset = uint8_t(p[12]) >> 4;
253 h.f_fin = (uint8_t(p[13]) >> 0) & 1;
254 h.f_syn = (uint8_t(p[13]) >> 1) & 1;
255 h.f_rst = (uint8_t(p[13]) >> 2) & 1;
256 h.f_psh = (uint8_t(p[13]) >> 3) & 1;
257 h.f_ack = (uint8_t(p[13]) >> 4) & 1;
258 h.f_urg = (uint8_t(p[13]) >> 5) & 1;
259 h.rsvd2 = (uint8_t(p[13]) >> 6) & 3;
260 h.window = read_be<uint16_t>(p + 14);
261 h.checksum = read_be<uint16_t>(p + 16);
262 h.urgent = read_be<uint16_t>(p + 18);
263 return h;
264 }
265 void write(char* p) const {
266 write_be<uint16_t>(p + 0, src_port);
267 write_be<uint16_t>(p + 2, dst_port);
268 write_be<uint32_t>(p + 4, seq.raw);
269 write_be<uint32_t>(p + 8, ack.raw);
270 p[12] = rsvd1 | (data_offset << 4);
271 p[13] = (f_fin << 0)
272 | (f_syn << 1)
273 | (f_rst << 2)
274 | (f_psh << 3)
275 | (f_ack << 4)
276 | (f_urg << 5)
277 | (rsvd2 << 6);
278 write_be<uint16_t>(p + 14, window);
279 write_be<uint16_t>(p + 16, checksum);
280 write_be<uint16_t>(p + 18, urgent);
281 }
282 static void write_nbo_checksum(char* p, uint16_t checksum_in_network_byte_order) {
283 std::copy_n(reinterpret_cast<const char*>(&checksum_in_network_byte_order), 2, p + 16);
284 }
285};
286
287struct tcp_tag {};
288using tcp_packet_merger = packet_merger<tcp_seq, tcp_tag>;
289
290template <typename InetTraits>
291class tcp {
292public:
293 using ipaddr = typename InetTraits::address_type;
294 using inet_type = typename InetTraits::inet_type;
295 using connid = l4connid<InetTraits>;
296 using connid_hash = typename connid::connid_hash;
297 class connection;
298 class listener;
299private:
300 class tcb;
301
302 class tcb : public enable_lw_shared_from_this<tcb> {
303 using clock_type = lowres_clock;
304 static constexpr tcp_state CLOSED = tcp_state::CLOSED;
305 static constexpr tcp_state LISTEN = tcp_state::LISTEN;
306 static constexpr tcp_state SYN_SENT = tcp_state::SYN_SENT;
307 static constexpr tcp_state SYN_RECEIVED = tcp_state::SYN_RECEIVED;
308 static constexpr tcp_state ESTABLISHED = tcp_state::ESTABLISHED;
309 static constexpr tcp_state FIN_WAIT_1 = tcp_state::FIN_WAIT_1;
310 static constexpr tcp_state FIN_WAIT_2 = tcp_state::FIN_WAIT_2;
311 static constexpr tcp_state CLOSE_WAIT = tcp_state::CLOSE_WAIT;
312 static constexpr tcp_state CLOSING = tcp_state::CLOSING;
313 static constexpr tcp_state LAST_ACK = tcp_state::LAST_ACK;
314 static constexpr tcp_state TIME_WAIT = tcp_state::TIME_WAIT;
315 tcp_state _state = CLOSED;
316 tcp& _tcp;
317 connection* _conn = nullptr;
318 promise<> _connect_done;
1e59de90 319 std::optional<promise<>> _fin_recvd_promise = promise<>();
11fdf7f2
TL
320 ipaddr _local_ip;
321 ipaddr _foreign_ip;
322 uint16_t _local_port;
323 uint16_t _foreign_port;
324 struct unacked_segment {
325 packet p;
326 uint16_t data_len;
327 unsigned nr_transmits;
328 clock_type::time_point tx_time;
329 };
330 struct send {
331 tcp_seq unacknowledged;
332 tcp_seq next;
333 uint32_t window;
334 uint8_t window_scale;
335 uint16_t mss;
336 tcp_seq urgent;
337 tcp_seq wl1;
338 tcp_seq wl2;
339 tcp_seq initial;
340 std::deque<unacked_segment> data;
341 std::deque<packet> unsent;
342 uint32_t unsent_len = 0;
343 bool closed = false;
344 promise<> _window_opened;
345 // Wait for all data are acked
f67539c2 346 std::optional<promise<>> _all_data_acked_promise;
11fdf7f2
TL
347 // Limit number of data queued into send queue
348 size_t max_queue_space = 212992;
349 size_t current_queue_space = 0;
350 // wait for there is at least one byte available in the queue
f67539c2 351 std::optional<promise<>> _send_available_promise;
11fdf7f2
TL
352 // Round-trip time variation
353 std::chrono::milliseconds rttvar;
354 // Smoothed round-trip time
355 std::chrono::milliseconds srtt;
356 bool first_rto_sample = true;
357 clock_type::time_point syn_tx_time;
358 // Congestion window
359 uint32_t cwnd;
360 // Slow start threshold
361 uint32_t ssthresh;
362 // Duplicated ACKs
363 uint16_t dupacks = 0;
364 unsigned syn_retransmit = 0;
365 unsigned fin_retransmit = 0;
366 uint32_t limited_transfer = 0;
367 uint32_t partial_ack = 0;
368 tcp_seq recover;
369 bool window_probe = false;
370 uint8_t zero_window_probing_out = 0;
371 } _snd;
372 struct receive {
373 tcp_seq next;
374 uint32_t window;
375 uint8_t window_scale;
376 uint16_t mss;
377 tcp_seq urgent;
378 tcp_seq initial;
379 std::deque<packet> data;
380 // The total size of data stored in std::deque<packet> data
381 size_t data_size = 0;
382 tcp_packet_merger out_of_order;
f67539c2 383 std::optional<promise<>> _data_received_promise;
11fdf7f2
TL
384 // The maximun memory buffer size allowed for receiving
385 // Currently, it is the same as default receive window size when window scaling is enabled
386 size_t max_receive_buf_size = 3737600;
387 } _rcv;
388 tcp_option _option;
389 timer<lowres_clock> _delayed_ack;
390 // Retransmission timeout
391 std::chrono::milliseconds _rto{1000};
392 std::chrono::milliseconds _persist_time_out{1000};
393 static constexpr std::chrono::milliseconds _rto_min{1000};
394 static constexpr std::chrono::milliseconds _rto_max{60000};
395 // Clock granularity
396 static constexpr std::chrono::milliseconds _rto_clk_granularity{1};
397 static constexpr uint16_t _max_nr_retransmit{5};
398 timer<lowres_clock> _retransmit;
399 timer<lowres_clock> _persist;
400 uint16_t _nr_full_seg_received = 0;
401 struct isn_secret {
402 // 512 bits secretkey for ISN generating
403 uint32_t key[16];
404 isn_secret () {
405 std::random_device rd;
406 std::default_random_engine e(rd());
407 std::uniform_int_distribution<uint32_t> dist{};
408 for (auto& k : key) {
409 k = dist(e);
410 }
411 }
412 };
413 static isn_secret _isn_secret;
414 tcp_seq get_isn();
415 circular_buffer<typename InetTraits::l4packet> _packetq;
416 bool _poll_active = false;
417 uint32_t get_default_receive_window_size() {
418 // Linux's default window size
419 constexpr uint32_t size = 29200;
420 return size << _rcv.window_scale;
421 }
422 // Returns the current receive window according to available receiving buffer size
423 uint32_t get_modified_receive_window_size() {
424 uint32_t left = _rcv.data_size > _rcv.max_receive_buf_size ? 0 : _rcv.max_receive_buf_size - _rcv.data_size;
425 return std::min(left, get_default_receive_window_size());
426 }
427 public:
428 tcb(tcp& t, connid id);
429 void input_handle_listen_state(tcp_hdr* th, packet p);
430 void input_handle_syn_sent_state(tcp_hdr* th, packet p);
431 void input_handle_other_state(tcp_hdr* th, packet p);
432 void output_one(bool data_retransmit = false);
433 future<> wait_for_data();
1e59de90
TL
434 future<> wait_input_shutdown();
435 void abort_reader() noexcept;
11fdf7f2
TL
436 future<> wait_for_all_data_acked();
437 future<> wait_send_available();
438 future<> send(packet p);
439 void connect();
440 packet read();
1e59de90 441 void close() noexcept;
11fdf7f2
TL
442 void remove_from_tcbs() {
443 auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
444 _tcp._tcbs.erase(id);
445 }
f67539c2 446 std::optional<typename InetTraits::l4packet> get_packet();
11fdf7f2
TL
447 void output() {
448 if (!_poll_active) {
449 _poll_active = true;
9f95a23c
TL
450 // FIXME: future is discarded
451 (void)_tcp.poll_tcb(_foreign_ip, this->shared_from_this()).then_wrapped([this] (auto&& f) {
11fdf7f2
TL
452 try {
453 f.get();
454 } catch(arp_queue_full_error& ex) {
455 // retry later
456 _poll_active = false;
457 this->start_retransmit_timer();
458 } catch(arp_timeout_error& ex) {
459 if (this->in_state(SYN_SENT)) {
460 _connect_done.set_exception(ex);
461 this->cleanup();
462 }
463 // in other states connection should time out
464 }
465 });
466 }
467 }
468 future<> connect_done() {
469 return _connect_done.get_future();
470 }
471 tcp_state& state() {
472 return _state;
473 }
474 private:
475 void respond_with_reset(tcp_hdr* th);
476 bool merge_out_of_order();
477 void insert_out_of_order(tcp_seq seq, packet p);
478 void trim_receive_data_after_window();
479 bool should_send_ack(uint16_t seg_len);
1e59de90 480 void clear_delayed_ack() noexcept;
11fdf7f2
TL
481 packet get_transmit_packet();
482 void retransmit_one() {
483 bool data_retransmit = true;
484 output_one(data_retransmit);
485 }
486 void start_retransmit_timer() {
487 auto now = clock_type::now();
488 start_retransmit_timer(now);
489 };
490 void start_retransmit_timer(clock_type::time_point now) {
491 auto tp = now + _rto;
492 _retransmit.rearm(tp);
493 };
1e59de90 494 void stop_retransmit_timer() noexcept {
11fdf7f2
TL
495 _retransmit.cancel();
496 };
497 void start_persist_timer() {
498 auto now = clock_type::now();
499 start_persist_timer(now);
500 };
501 void start_persist_timer(clock_type::time_point now) {
502 auto tp = now + _persist_time_out;
503 _persist.rearm(tp);
504 };
505 void stop_persist_timer() {
506 _persist.cancel();
507 };
508 void persist();
509 void retransmit();
510 void fast_retransmit();
511 void update_rto(clock_type::time_point tx_time);
512 void update_cwnd(uint32_t acked_bytes);
513 void cleanup();
514 uint32_t can_send() {
515 if (_snd.window_probe) {
516 return 1;
517 }
518
519 // Can not send if send window is zero
520 if (_snd.window == 0) {
521 return 0;
522 }
523
524 // Can not send if send window is less than unacknowledged data size
525 auto window_used = uint32_t(_snd.next - _snd.unacknowledged);
526 if (window_used > _snd.window) {
527 return 0;
528 }
529
530 // Can not send more than advertised window allows or unsent data size
531 auto x = std::min(_snd.window - window_used, _snd.unsent_len);
532
533 // Can not send more than congestion window allows
534 x = std::min(_snd.cwnd, x);
535 if (_snd.dupacks == 1 || _snd.dupacks == 2) {
536 // RFC5681 Step 3.1
537 // Send cwnd + 2 * smss per RFC3042
538 auto flight = flight_size();
539 auto max = _snd.cwnd + 2 * _snd.mss;
540 x = flight <= max ? std::min(x, max - flight) : 0;
541 _snd.limited_transfer += x;
542 } else if (_snd.dupacks >= 3) {
543 // RFC5681 Step 3.5
544 // Sent 1 full-sized segment at most
545 x = std::min(uint32_t(_snd.mss), x);
546 }
547 return x;
548 }
549 uint32_t flight_size() {
550 uint32_t size = 0;
551 std::for_each(_snd.data.begin(), _snd.data.end(), [&] (unacked_segment& seg) { size += seg.p.len(); });
552 return size;
553 }
554 uint16_t local_mss() {
555 return _tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
556 }
557 void queue_packet(packet p) {
558 _packetq.emplace_back(typename InetTraits::l4packet{_foreign_ip, std::move(p)});
559 }
560 void signal_data_received() {
561 if (_rcv._data_received_promise) {
562 _rcv._data_received_promise->set_value();
563 _rcv._data_received_promise = {};
564 }
565 }
566 void signal_all_data_acked() {
567 if (_snd._all_data_acked_promise && _snd.unsent_len == 0) {
568 _snd._all_data_acked_promise->set_value();
569 _snd._all_data_acked_promise = {};
570 }
571 }
572 void signal_send_available() {
573 if (_snd._send_available_promise && _snd.max_queue_space > _snd.current_queue_space) {
574 _snd._send_available_promise->set_value();
575 _snd._send_available_promise = {};
576 }
577 }
578 void do_syn_sent() {
579 _state = SYN_SENT;
580 _snd.syn_tx_time = clock_type::now();
581 // Send <SYN> to remote
582 output();
583 }
584 void do_syn_received() {
585 _state = SYN_RECEIVED;
586 _snd.syn_tx_time = clock_type::now();
587 // Send <SYN,ACK> to remote
588 output();
589 }
590 void do_established() {
591 _state = ESTABLISHED;
592 update_rto(_snd.syn_tx_time);
593 _connect_done.set_value();
594 }
595 void do_reset() {
596 _state = CLOSED;
597 cleanup();
598 if (_rcv._data_received_promise) {
599 _rcv._data_received_promise->set_exception(tcp_reset_error());
f67539c2 600 _rcv._data_received_promise = std::nullopt;
11fdf7f2
TL
601 }
602 if (_snd._all_data_acked_promise) {
603 _snd._all_data_acked_promise->set_exception(tcp_reset_error());
f67539c2 604 _snd._all_data_acked_promise = std::nullopt;
11fdf7f2
TL
605 }
606 if (_snd._send_available_promise) {
607 _snd._send_available_promise->set_exception(tcp_reset_error());
f67539c2 608 _snd._send_available_promise = std::nullopt;
11fdf7f2
TL
609 }
610 }
611 void do_time_wait() {
612 // FIXME: Implement TIME_WAIT state timer
613 _state = TIME_WAIT;
614 cleanup();
615 }
616 void do_closed() {
617 _state = CLOSED;
618 cleanup();
619 }
620 void do_setup_isn() {
621 _snd.initial = get_isn();
622 _snd.unacknowledged = _snd.initial;
623 _snd.next = _snd.initial + 1;
624 _snd.recover = _snd.initial;
625 }
626 void do_local_fin_acked() {
627 _snd.unacknowledged += 1;
628 _snd.next += 1;
629 }
1e59de90 630 bool syn_needs_on() const noexcept {
11fdf7f2
TL
631 return in_state(SYN_SENT | SYN_RECEIVED);
632 }
1e59de90 633 bool fin_needs_on() const noexcept {
11fdf7f2
TL
634 return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed &&
635 _snd.unsent_len == 0;
636 }
1e59de90 637 bool ack_needs_on() const noexcept {
11fdf7f2
TL
638 return !in_state(CLOSED | LISTEN | SYN_SENT);
639 }
1e59de90 640 bool foreign_will_not_send() const noexcept {
11fdf7f2
TL
641 return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED);
642 }
1e59de90 643 bool in_state(tcp_state state) const noexcept {
11fdf7f2
TL
644 return uint16_t(_state) & uint16_t(state);
645 }
646 void exit_fast_recovery() {
647 _snd.dupacks = 0;
648 _snd.limited_transfer = 0;
649 _snd.partial_ack = 0;
650 }
651 uint32_t data_segment_acked(tcp_seq seg_ack);
652 bool segment_acceptable(tcp_seq seg_seq, unsigned seg_len);
653 void init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end);
654 friend class connection;
655 };
656 inet_type& _inet;
657 std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs;
658 std::unordered_map<uint16_t, listener*> _listening;
659 std::random_device _rd;
660 std::default_random_engine _e;
661 std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
662 circular_buffer<std::pair<lw_shared_ptr<tcb>, ethernet_address>> _poll_tcbs;
663 // queue for packets that do not belong to any tcb
664 circular_buffer<ipv4_traits::l4packet> _packetq;
665 semaphore _queue_space = {212992};
666 metrics::metric_groups _metrics;
667public:
9f95a23c
TL
668 const inet_type& inet() const {
669 return _inet;
670 }
11fdf7f2
TL
671 class connection {
672 lw_shared_ptr<tcb> _tcb;
673 public:
674 explicit connection(lw_shared_ptr<tcb> tcbp) : _tcb(std::move(tcbp)) { _tcb->_conn = this; }
675 connection(const connection&) = delete;
676 connection(connection&& x) noexcept : _tcb(std::move(x._tcb)) {
677 _tcb->_conn = this;
678 }
679 ~connection();
680 void operator=(const connection&) = delete;
681 connection& operator=(connection&& x) {
682 if (this != &x) {
683 this->~connection();
684 new (this) connection(std::move(x));
685 }
686 return *this;
687 }
688 future<> connected() {
689 return _tcb->connect_done();
690 }
691 future<> send(packet p) {
692 return _tcb->send(std::move(p));
693 }
694 future<> wait_for_data() {
695 return _tcb->wait_for_data();
696 }
1e59de90
TL
697 future<> wait_input_shutdown() {
698 return _tcb->wait_input_shutdown();
699 }
11fdf7f2
TL
700 packet read() {
701 return _tcb->read();
702 }
703 ipaddr foreign_ip() {
704 return _tcb->_foreign_ip;
705 }
706 uint16_t foreign_port() {
707 return _tcb->_foreign_port;
708 }
20effc67
TL
709 ipaddr local_ip() {
710 return _tcb->_local_ip;
711 }
712 uint16_t local_port() {
713 return _tcb->_local_port;
714 }
11fdf7f2 715 void shutdown_connect();
1e59de90
TL
716 void close_read() noexcept;
717 void close_write() noexcept;
11fdf7f2
TL
718 };
719 class listener {
720 tcp& _tcp;
721 uint16_t _port;
722 queue<connection> _q;
723 size_t _pending = 0;
724 private:
725 listener(tcp& t, uint16_t port, size_t queue_length)
726 : _tcp(t), _port(port), _q(queue_length) {
727 _tcp._listening.emplace(_port, this);
728 }
729 public:
730 listener(listener&& x)
731 : _tcp(x._tcp), _port(x._port), _q(std::move(x._q)) {
732 _tcp._listening[_port] = this;
733 x._port = 0;
734 }
735 ~listener() {
736 if (_port) {
737 _tcp._listening.erase(_port);
738 }
739 }
740 future<connection> accept() {
741 return _q.not_empty().then([this] {
742 return make_ready_future<connection>(_q.pop());
743 });
744 }
745 void abort_accept() {
746 _q.abort(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
747 }
748 bool full() { return _pending + _q.size() >= _q.max_size(); }
749 void inc_pending() { _pending++; }
750 void dec_pending() { _pending--; }
9f95a23c
TL
751
752 const tcp& get_tcp() const {
753 return _tcp;
754 }
755 uint16_t port() const {
756 return _port;
757 }
11fdf7f2
TL
758 friend class tcp;
759 };
760public:
761 explicit tcp(inet_type& inet);
762 void received(packet p, ipaddr from, ipaddr to);
763 bool forward(forward_hash& out_hash_data, packet& p, size_t off);
764 listener listen(uint16_t port, size_t queue_length = 100);
765 connection connect(socket_address sa);
766 const net::hw_features& hw_features() const { return _inet._inet.hw_features(); }
767 future<> poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb);
768 void add_connected_tcb(lw_shared_ptr<tcb> tcbp, uint16_t local_port) {
769 auto it = _listening.find(local_port);
770 if (it != _listening.end()) {
771 it->second->_q.push(connection(tcbp));
772 it->second->dec_pending();
773 }
774 }
775private:
776 void send_packet_without_tcb(ipaddr from, ipaddr to, packet p);
777 void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
778 friend class listener;
779};
780
781template <typename InetTraits>
782tcp<InetTraits>::tcp(inet_type& inet)
783 : _inet(inet)
784 , _e(_rd()) {
785 namespace sm = metrics;
786
787 _metrics.add_group("tcp", {
1e59de90 788 sm::make_counter("linearizations", [] { return tcp_packet_merger::linearizations(); },
11fdf7f2
TL
789 sm::description("Counts a number of times a buffer linearization was invoked during the buffers merge process. "
790 "Divide it by a total TCP receive packet rate to get an everage number of lineraizations per TCP packet."))
791 });
792
793 _inet.register_packet_provider([this, tcb_polled = 0u] () mutable {
f67539c2 794 std::optional<typename InetTraits::l4packet> l4p;
11fdf7f2
TL
795 auto c = _poll_tcbs.size();
796 if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
797 l4p = std::move(_packetq.front());
798 _packetq.pop_front();
799 _queue_space.signal(l4p.value().p.len());
800 } else {
801 while (c--) {
802 tcb_polled++;
803 lw_shared_ptr<tcb> tcb;
804 ethernet_address dst;
805 std::tie(tcb, dst) = std::move(_poll_tcbs.front());
806 _poll_tcbs.pop_front();
807 l4p = tcb->get_packet();
808 if (l4p) {
809 l4p.value().e_dst = dst;
810 break;
811 }
812 }
813 }
814 return l4p;
815 });
816}
817
818template <typename InetTraits>
819future<> tcp<InetTraits>::poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb) {
820 return _inet.get_l2_dst_address(to).then([this, tcb = std::move(tcb)] (ethernet_address dst) {
821 _poll_tcbs.emplace_back(std::move(tcb), dst);
822 });
823}
824
825template <typename InetTraits>
826auto tcp<InetTraits>::listen(uint16_t port, size_t queue_length) -> listener {
827 return listener(*this, port, queue_length);
828}
829
830template <typename InetTraits>
831auto tcp<InetTraits>::connect(socket_address sa) -> connection {
832 uint16_t src_port;
833 connid id;
834 auto src_ip = _inet._inet.host_address();
835 auto dst_ip = ipv4_address(sa);
836 auto dst_port = net::ntoh(sa.u.in.sin_port);
837
838 do {
839 src_port = _port_dist(_e);
840 id = connid{src_ip, dst_ip, src_port, dst_port};
841 } while (_inet._inet.netif()->hw_queues_count() > 1 &&
f67539c2 842 (_inet._inet.netif()->hash2cpu(id.hash(_inet._inet.netif()->rss_key())) != this_shard_id()
11fdf7f2
TL
843 || _tcbs.find(id) != _tcbs.end()));
844
845 auto tcbp = make_lw_shared<tcb>(*this, id);
846 _tcbs.insert({id, tcbp});
847 tcbp->connect();
848 return connection(tcbp);
849}
850
851template <typename InetTraits>
852bool tcp<InetTraits>::forward(forward_hash& out_hash_data, packet& p, size_t off) {
853 auto th = p.get_header(off, tcp_hdr::len);
854 if (th) {
855 // src_port, dst_port in network byte order
856 out_hash_data.push_back(uint8_t(th[0]));
857 out_hash_data.push_back(uint8_t(th[1]));
858 out_hash_data.push_back(uint8_t(th[2]));
859 out_hash_data.push_back(uint8_t(th[3]));
860 }
861 return true;
862}
863
864template <typename InetTraits>
865void tcp<InetTraits>::received(packet p, ipaddr from, ipaddr to) {
866 auto th = p.get_header(0, tcp_hdr::len);
867 if (!th) {
868 return;
869 }
870 // data_offset is correct even before ntoh()
871 auto data_offset = uint8_t(th[12]) >> 4;
872 if (size_t(data_offset * 4) < tcp_hdr::len) {
873 return;
874 }
875
876 if (!hw_features().rx_csum_offload) {
877 checksummer csum;
878 InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
879 csum.sum(p);
880 if (csum.get() != 0) {
881 return;
882 }
883 }
884 auto h = tcp_hdr::read(th);
885 auto id = connid{to, from, h.dst_port, h.src_port};
886 auto tcbi = _tcbs.find(id);
887 lw_shared_ptr<tcb> tcbp;
888 if (tcbi == _tcbs.end()) {
889 auto listener = _listening.find(id.local_port);
890 if (listener == _listening.end() || listener->second->full()) {
891 // 1) In CLOSE state
892 // 1.1 all data in the incoming segment is discarded. An incoming
893 // segment containing a RST is discarded. An incoming segment not
894 // containing a RST causes a RST to be sent in response.
895 // FIXME:
896 // if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK>
897 // if ACK on: <SEQ=SEG.ACK><CTL=RST>
898 return respond_with_reset(&h, id.local_ip, id.foreign_ip);
899 } else {
900 // 2) In LISTEN state
901 // 2.1 first check for an RST
902 if (h.f_rst) {
903 // An incoming RST should be ignored
904 return;
905 }
906 // 2.2 second check for an ACK
907 if (h.f_ack) {
908 // Any acknowledgment is bad if it arrives on a connection
909 // still in the LISTEN state.
910 // <SEQ=SEG.ACK><CTL=RST>
911 return respond_with_reset(&h, id.local_ip, id.foreign_ip);
912 }
913 // 2.3 third check for a SYN
914 if (h.f_syn) {
915 // check the security
916 // NOTE: Ignored for now
917 tcbp = make_lw_shared<tcb>(*this, id);
918 _tcbs.insert({id, tcbp});
919 // TODO: we need to remove the tcb and decrease the pending if
920 // it stays SYN_RECEIVED state forever.
921 listener->second->inc_pending();
922
923 return tcbp->input_handle_listen_state(&h, std::move(p));
924 }
925 // 2.4 fourth other text or control
926 // So you are unlikely to get here, but if you do, drop the
927 // segment, and return.
928 return;
929 }
930 } else {
931 tcbp = tcbi->second;
932 if (tcbp->state() == tcp_state::SYN_SENT) {
933 // 3) In SYN_SENT State
934 return tcbp->input_handle_syn_sent_state(&h, std::move(p));
935 } else {
936 // 4) In other state, can be one of the following:
937 // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
938 // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
939 return tcbp->input_handle_other_state(&h, std::move(p));
940 }
941 }
942}
943
944// Send packet does not belong to any tcb
945template <typename InetTraits>
946void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, packet p) {
947 if (_queue_space.try_wait(p.len())) { // drop packets that do not fit the queue
9f95a23c
TL
948 // FIXME: future is discarded
949 (void)_inet.get_l2_dst_address(to).then([this, to, p = std::move(p)] (ethernet_address e_dst) mutable {
11fdf7f2
TL
950 _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
951 });
952 }
953}
954
955template <typename InetTraits>
956tcp<InetTraits>::connection::~connection() {
957 if (_tcb) {
958 _tcb->_conn = nullptr;
959 close_read();
960 close_write();
961 }
962}
963
964template <typename InetTraits>
965tcp<InetTraits>::tcb::tcb(tcp& t, connid id)
966 : _tcp(t)
967 , _local_ip(id.local_ip)
968 , _foreign_ip(id.foreign_ip)
969 , _local_port(id.local_port)
970 , _foreign_port(id.foreign_port)
971 , _delayed_ack([this] { _nr_full_seg_received = 0; output(); })
972 , _retransmit([this] { retransmit(); })
973 , _persist([this] { persist(); }) {
974}
975
976template <typename InetTraits>
977void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth) {
978 _tcp.respond_with_reset(rth, _local_ip, _foreign_ip);
979}
980
981template <typename InetTraits>
982void tcp<InetTraits>::respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip) {
983 if (rth->f_rst) {
984 return;
985 }
986 packet p;
987 auto th = p.prepend_uninitialized_header(tcp_hdr::len);
988 auto h = tcp_hdr{};
989 h.src_port = rth->dst_port;
990 h.dst_port = rth->src_port;
991 if (rth->f_ack) {
992 h.seq = rth->ack;
993 }
994 // If this RST packet is in response to a SYN packet. We ACK the ISN.
995 if (rth->f_syn) {
996 h.ack = rth->seq + 1;
997 h.f_ack = true;
998 }
999 h.f_rst = true;
1000 h.data_offset = tcp_hdr::len / 4;
1001 h.checksum = 0;
1002 h.write(th);
1003
1004 checksummer csum;
1005 offload_info oi;
1006 InetTraits::tcp_pseudo_header_checksum(csum, local_ip, foreign_ip, tcp_hdr::len);
1007 uint16_t checksum;
1008 if (hw_features().tx_csum_l4_offload) {
1009 checksum = ~csum.get();
1010 oi.needs_csum = true;
1011 } else {
1012 csum.sum(p);
1013 checksum = csum.get();
1014 oi.needs_csum = false;
1015 }
1016 tcp_hdr::write_nbo_checksum(th, checksum);
1017
1018 oi.protocol = ip_protocol_num::tcp;
1019 oi.tcp_hdr_len = tcp_hdr::len;
1020 p.set_offload_info(oi);
1021
1022 send_packet_without_tcb(local_ip, foreign_ip, std::move(p));
1023}
1024
1025template <typename InetTraits>
1026uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_seq seg_ack) {
1027 uint32_t total_acked_bytes = 0;
1028 // Full ACK of segment
1029 while (!_snd.data.empty()
1030 && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) {
1031 auto acked_bytes = _snd.data.front().p.len();
1032 _snd.unacknowledged += acked_bytes;
1033 // Ignore retransmitted segments when setting the RTO
1034 if (_snd.data.front().nr_transmits == 0) {
1035 update_rto(_snd.data.front().tx_time);
1036 }
1037 update_cwnd(acked_bytes);
1038 total_acked_bytes += acked_bytes;
1039 _snd.current_queue_space -= _snd.data.front().data_len;
1040 signal_send_available();
1041 _snd.data.pop_front();
1042 }
1043 // Partial ACK of segment
1044 if (_snd.unacknowledged < seg_ack) {
1045 auto acked_bytes = seg_ack - _snd.unacknowledged;
1046 if (!_snd.data.empty()) {
1047 auto& unacked_seg = _snd.data.front();
1048 unacked_seg.p.trim_front(acked_bytes);
1049 }
1050 _snd.unacknowledged = seg_ack;
1051 update_cwnd(acked_bytes);
1052 total_acked_bytes += acked_bytes;
1053 }
1054 return total_acked_bytes;
1055}
1056
1057template <typename InetTraits>
1058bool tcp<InetTraits>::tcb::segment_acceptable(tcp_seq seg_seq, unsigned seg_len) {
1059 if (seg_len == 0 && _rcv.window == 0) {
1060 // SEG.SEQ = RCV.NXT
1061 return seg_seq == _rcv.next;
1062 } else if (seg_len == 0 && _rcv.window > 0) {
1063 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1064 return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window);
1065 } else if (seg_len > 0 && _rcv.window > 0) {
1066 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1067 // or
1068 // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
1069 bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window);
1070 bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window);
1071 return x || y;
1072 } else {
1073 // SEG.LEN > 0 RCV.WND = 0, not acceptable
1074 return false;
1075 }
1076}
1077
1078template <typename InetTraits>
1079void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) {
1080 // Handle tcp options
1081 _option.parse(opt_start, opt_end);
1082
1083 // Remote receive window scale factor
1084 _snd.window_scale = _option._remote_win_scale;
1085 // Local receive window scale factor
1086 _rcv.window_scale = _option._local_win_scale;
1087
1088 // Maximum segment size remote can receive
1089 _snd.mss = _option._remote_mss;
1090 // Maximum segment size local can receive
1091 _rcv.mss = _option._local_mss = local_mss();
1092
1093 _rcv.window = get_default_receive_window_size();
1094 _snd.window = th->window << _snd.window_scale;
1095
1096 // Segment sequence number used for last window update
1097 _snd.wl1 = th->seq;
1098 // Segment acknowledgment number used for last window update
1099 _snd.wl2 = th->ack;
1100
1101 // Setup initial congestion window
1102 if (2190 < _snd.mss) {
1103 _snd.cwnd = 2 * _snd.mss;
1104 } else if (1095 < _snd.mss && _snd.mss <= 2190) {
1105 _snd.cwnd = 3 * _snd.mss;
1106 } else {
1107 _snd.cwnd = 4 * _snd.mss;
1108 }
1109
1110 // Setup initial slow start threshold
1111 _snd.ssthresh = th->window << _snd.window_scale;
1112}
1113
1114template <typename InetTraits>
1115void tcp<InetTraits>::tcb::input_handle_listen_state(tcp_hdr* th, packet p) {
1116 auto opt_len = th->data_offset * 4 - tcp_hdr::len;
1117 auto opt_start = reinterpret_cast<uint8_t*>(p.get_header(0, th->data_offset * 4)) + tcp_hdr::len;
1118 auto opt_end = opt_start + opt_len;
1119 p.trim_front(th->data_offset * 4);
1120 tcp_seq seg_seq = th->seq;
1121
1122 // Set RCV.NXT to SEG.SEQ+1, IRS is set to SEG.SEQ
1123 _rcv.next = seg_seq + 1;
1124 _rcv.initial = seg_seq;
1125
1126 // ISS should be selected and a SYN segment sent of the form:
1127 // <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK>
1128 // SND.NXT is set to ISS+1 and SND.UNA to ISS
1129 // NOTE: In previous code, _snd.next is set to ISS + 1 only when SYN is
1130 // ACKed. Now, we set _snd.next to ISS + 1 here, so in output_one(): we
1131 // have
1132 // th->seq = syn_on ? _snd.initial : _snd.next
1133 // to make sure retransmitted SYN has correct SEQ number.
1134 do_setup_isn();
1135
1136 _rcv.urgent = _rcv.next;
1137
1138 tcp_debug("listen: LISTEN -> SYN_RECEIVED\n");
1139 init_from_options(th, opt_start, opt_end);
1140 do_syn_received();
1141}
1142
1143template <typename InetTraits>
1144void tcp<InetTraits>::tcb::input_handle_syn_sent_state(tcp_hdr* th, packet p) {
1145 auto opt_len = th->data_offset * 4 - tcp_hdr::len;
1146 auto opt_start = reinterpret_cast<uint8_t*>(p.get_header(0, th->data_offset * 4)) + tcp_hdr::len;
1147 auto opt_end = opt_start + opt_len;
1148 p.trim_front(th->data_offset * 4);
1149 tcp_seq seg_seq = th->seq;
1150 auto seg_ack = th->ack;
1151
1152 bool acceptable = false;
1153 // 3.1 first check the ACK bit
1154 if (th->f_ack) {
1155 // If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset (unless the
1156 // RST bit is set, if so drop the segment and return)
1157 if (seg_ack <= _snd.initial || seg_ack > _snd.next) {
1158 return respond_with_reset(th);
1159 }
1160
1161 // If SND.UNA =< SEG.ACK =< SND.NXT then the ACK is acceptable.
1162 acceptable = _snd.unacknowledged <= seg_ack && seg_ack <= _snd.next;
1163 }
1164
1165 // 3.2 second check the RST bit
1166 if (th->f_rst) {
1167 // If the ACK was acceptable then signal the user "error: connection
1168 // reset", drop the segment, enter CLOSED state, delete TCB, and
1169 // return. Otherwise (no ACK) drop the segment and return.
1170 if (acceptable) {
1e59de90 1171 _connect_done.set_exception(tcp_refused_error());
11fdf7f2
TL
1172 return do_reset();
1173 } else {
1174 return;
1175 }
1176 }
1177
1178 // 3.3 third check the security and precedence
1179 // NOTE: Ignored for now
1180
1181 // 3.4 fourth check the SYN bit
1182 if (th->f_syn) {
1183 // RCV.NXT is set to SEG.SEQ+1, IRS is set to SEG.SEQ. SND.UNA should
1184 // be advanced to equal SEG.ACK (if there is an ACK), and any segments
1185 // on the retransmission queue which are thereby acknowledged should be
1186 // removed.
1187 _rcv.next = seg_seq + 1;
1188 _rcv.initial = seg_seq;
1189 if (th->f_ack) {
1190 // TODO: clean retransmission queue
1191 _snd.unacknowledged = seg_ack;
1192 }
1193 if (_snd.unacknowledged > _snd.initial) {
1194 // If SND.UNA > ISS (our SYN has been ACKed), change the connection
1195 // state to ESTABLISHED, form an ACK segment
1196 // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
1197 tcp_debug("syn: SYN_SENT -> ESTABLISHED\n");
1198 init_from_options(th, opt_start, opt_end);
1199 do_established();
1200 output();
1201 } else {
1202 // Otherwise enter SYN_RECEIVED, form a SYN,ACK segment
1203 // <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK>
1204 tcp_debug("syn: SYN_SENT -> SYN_RECEIVED\n");
1205 do_syn_received();
1206 }
1207 }
1208
1209 // 3.5 fifth, if neither of the SYN or RST bits is set then drop the
1210 // segment and return.
1211 return;
1212}
1213
1214template <typename InetTraits>
1215void tcp<InetTraits>::tcb::input_handle_other_state(tcp_hdr* th, packet p) {
1216 p.trim_front(th->data_offset * 4);
1217 bool do_output = false;
1218 bool do_output_data = false;
1219 tcp_seq seg_seq = th->seq;
1220 auto seg_ack = th->ack;
1221 auto seg_len = p.len();
1222
1223 // 4.1 first check sequence number
1224 if (!segment_acceptable(seg_seq, seg_len)) {
1225 //<SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
1226 return output();
1227 }
1228
1229 // In the following it is assumed that the segment is the idealized
1230 // segment that begins at RCV.NXT and does not exceed the window.
1231 if (seg_seq < _rcv.next) {
1232 // ignore already acknowledged data
1233 auto dup = std::min(uint32_t(_rcv.next - seg_seq), seg_len);
1234 p.trim_front(dup);
1235 seg_len -= dup;
1236 seg_seq += dup;
1237 }
1238 // FIXME: We should trim data outside the right edge of the receive window as well
1239
1240 if (seg_seq != _rcv.next) {
1241 insert_out_of_order(seg_seq, std::move(p));
1242 // A TCP receiver SHOULD send an immediate duplicate ACK
1243 // when an out-of-order segment arrives.
1244 return output();
1245 }
1246
1247 // 4.2 second check the RST bit
1248 if (th->f_rst) {
1249 if (in_state(SYN_RECEIVED)) {
1250 // If this connection was initiated with a passive OPEN (i.e.,
1251 // came from the LISTEN state), then return this connection to
1252 // LISTEN state and return. The user need not be informed. If
1253 // this connection was initiated with an active OPEN (i.e., came
1254 // from SYN_SENT state) then the connection was refused, signal
1255 // the user "connection refused". In either case, all segments
1256 // on the retransmission queue should be removed. And in the
1257 // active OPEN case, enter the CLOSED state and delete the TCB,
1258 // and return.
1259 _connect_done.set_exception(tcp_refused_error());
1260 return do_reset();
1261 }
1262 if (in_state(ESTABLISHED | FIN_WAIT_1 | FIN_WAIT_2 | CLOSE_WAIT)) {
1263 // If the RST bit is set then, any outstanding RECEIVEs and SEND
1264 // should receive "reset" responses. All segment queues should be
1265 // flushed. Users should also receive an unsolicited general
1266 // "connection reset" signal. Enter the CLOSED state, delete the
1267 // TCB, and return.
1268 return do_reset();
1269 }
1270 if (in_state(CLOSING | LAST_ACK | TIME_WAIT)) {
1271 // If the RST bit is set then, enter the CLOSED state, delete the
1272 // TCB, and return.
1273 return do_closed();
1274 }
1275 }
1276
1277 // 4.3 third check security and precedence
1278 // NOTE: Ignored for now
1279
1280 // 4.4 fourth, check the SYN bit
1281 if (th->f_syn) {
1282 // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
1283 // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
1284
1285 // If the SYN is in the window it is an error, send a reset, any
1286 // outstanding RECEIVEs and SEND should receive "reset" responses,
1287 // all segment queues should be flushed, the user should also
1288 // receive an unsolicited general "connection reset" signal, enter
1289 // the CLOSED state, delete the TCB, and return.
1290 respond_with_reset(th);
1291 return do_reset();
1292
1293 // If the SYN is not in the window this step would not be reached
1294 // and an ack would have been sent in the first step (sequence
1295 // number check).
1296 }
1297
1298 // 4.5 fifth check the ACK field
1299 if (!th->f_ack) {
1300 // if the ACK bit is off drop the segment and return
1301 return;
1302 } else {
1303 // SYN_RECEIVED STATE
1304 if (in_state(SYN_RECEIVED)) {
1305 // If SND.UNA =< SEG.ACK =< SND.NXT then enter ESTABLISHED state
1306 // and continue processing.
1307 if (_snd.unacknowledged <= seg_ack && seg_ack <= _snd.next) {
1308 tcp_debug("SYN_RECEIVED -> ESTABLISHED\n");
1309 do_established();
1310 _tcp.add_connected_tcb(this->shared_from_this(), _local_port);
1311 } else {
1312 // <SEQ=SEG.ACK><CTL=RST>
1313 return respond_with_reset(th);
1314 }
1315 }
1316 auto update_window = [this, th, seg_seq, seg_ack] {
1317 tcp_debug("window update seg_seq=%d, seg_ack=%d, old window=%d new window=%d\n",
1318 seg_seq, seg_ack, _snd.window, th->window << _snd.window_scale);
1319 _snd.window = th->window << _snd.window_scale;
1320 _snd.wl1 = seg_seq;
1321 _snd.wl2 = seg_ack;
1322 _snd.zero_window_probing_out = 0;
1323 if (_snd.window == 0) {
1324 _persist_time_out = _rto;
1325 start_persist_timer();
1326 } else {
1327 stop_persist_timer();
1328 }
1329 };
1330 // ESTABLISHED STATE or
1331 // CLOSE_WAIT STATE: Do the same processing as for the ESTABLISHED state.
1332 if (in_state(ESTABLISHED | CLOSE_WAIT)){
1333 // When we are in zero window probing phase and packets_out = 0 we bypass "duplicated ack" check
1334 auto packets_out = _snd.next - _snd.unacknowledged - _snd.zero_window_probing_out;
1335 // If SND.UNA < SEG.ACK =< SND.NXT then, set SND.UNA <- SEG.ACK.
1336 if (_snd.unacknowledged < seg_ack && seg_ack <= _snd.next) {
1337 // Remote ACKed data we sent
1338 auto acked_bytes = data_segment_acked(seg_ack);
1339
1340 // If SND.UNA < SEG.ACK =< SND.NXT, the send window should be updated.
1341 if (_snd.wl1 < seg_seq || (_snd.wl1 == seg_seq && _snd.wl2 <= seg_ack)) {
1342 update_window();
1343 }
1344
1345 // some data is acked, try send more data
1346 do_output_data = true;
1347
1348 auto set_retransmit_timer = [this] {
1349 if (_snd.data.empty()) {
1350 // All outstanding segments are acked, turn off the timer.
1351 stop_retransmit_timer();
1352 // Signal the waiter of this event
1353 signal_all_data_acked();
1354 } else {
1355 // Restart the timer becasue new data is acked.
1356 start_retransmit_timer();
1357 }
1358 };
1359
1360 if (_snd.dupacks >= 3) {
1361 // We are in fast retransmit / fast recovery phase
1362 uint32_t smss = _snd.mss;
1363 if (seg_ack > _snd.recover) {
1364 tcp_debug("ack: full_ack\n");
1365 // Set cwnd to min (ssthresh, max(FlightSize, SMSS) + SMSS)
1366 _snd.cwnd = std::min(_snd.ssthresh, std::max(flight_size(), smss) + smss);
1367 // Exit the fast recovery procedure
1368 exit_fast_recovery();
1369 set_retransmit_timer();
1370 } else {
1371 tcp_debug("ack: partial_ack\n");
1372 // Retransmit the first unacknowledged segment
1373 fast_retransmit();
1374 // Deflate the congestion window by the amount of new data
1375 // acknowledged by the Cumulative Acknowledgment field
1376 _snd.cwnd -= acked_bytes;
1377 // If the partial ACK acknowledges at least one SMSS of new
1378 // data, then add back SMSS bytes to the congestion window
1379 if (acked_bytes >= smss) {
1380 _snd.cwnd += smss;
1381 }
1382 // Send a new segment if permitted by the new value of
1383 // cwnd. Do not exit the fast recovery procedure For
1384 // the first partial ACK that arrives during fast
1385 // recovery, also reset the retransmit timer.
1386 if (++_snd.partial_ack == 1) {
1387 start_retransmit_timer();
1388 }
1389 }
1390 } else {
1391 // RFC5681: The fast retransmit algorithm uses the arrival
1392 // of 3 duplicate ACKs (as defined in section 2, without
1393 // any intervening ACKs which move SND.UNA) as an
1394 // indication that a segment has been lost.
1395 //
1396 // So, here we reset dupacks to zero becasue this ACK moves
1397 // SND.UNA.
1398 exit_fast_recovery();
1399 set_retransmit_timer();
1400 }
1401 } else if ((packets_out > 0) && !_snd.data.empty() && seg_len == 0 &&
1402 th->f_fin == 0 && th->f_syn == 0 &&
1403 th->ack == _snd.unacknowledged &&
1404 uint32_t(th->window << _snd.window_scale) == _snd.window) {
1405 // Note:
1406 // RFC793 states:
1407 // If the ACK is a duplicate (SEG.ACK < SND.UNA), it can be ignored
1408 // RFC5681 states:
1409 // The TCP sender SHOULD use the "fast retransmit" algorithm to detect
1410 // and repair loss, based on incoming duplicate ACKs.
1411 // Here, We follow RFC5681.
1412 _snd.dupacks++;
1413 uint32_t smss = _snd.mss;
1414 // 3 duplicated ACKs trigger a fast retransmit
1415 if (_snd.dupacks == 1 || _snd.dupacks == 2) {
1416 // RFC5681 Step 3.1
1417 // Send cwnd + 2 * smss per RFC3042
1418 do_output_data = true;
1419 } else if (_snd.dupacks == 3) {
1420 // RFC6582 Step 3.2
1421 if (seg_ack - 1 > _snd.recover) {
1422 _snd.recover = _snd.next - 1;
1423 // RFC5681 Step 3.2
1424 _snd.ssthresh = std::max((flight_size() - _snd.limited_transfer) / 2, 2 * smss);
1425 fast_retransmit();
1426 } else {
1427 // Do not enter fast retransmit and do not reset ssthresh
1428 }
1429 // RFC5681 Step 3.3
1430 _snd.cwnd = _snd.ssthresh + 3 * smss;
1431 } else if (_snd.dupacks > 3) {
1432 // RFC5681 Step 3.4
1433 _snd.cwnd += smss;
1434 // RFC5681 Step 3.5
1435 do_output_data = true;
1436 }
1437 } else if (seg_ack > _snd.next) {
1438 // If the ACK acks something not yet sent (SEG.ACK > SND.NXT)
1439 // then send an ACK, drop the segment, and return
1440 return output();
1441 } else if (_snd.window == 0 && th->window > 0) {
1442 update_window();
1443 do_output_data = true;
1444 }
1445 }
1446 // FIN_WAIT_1 STATE
1447 if (in_state(FIN_WAIT_1)) {
1448 // In addition to the processing for the ESTABLISHED state, if
1449 // our FIN is now acknowledged then enter FIN-WAIT-2 and continue
1450 // processing in that state.
1451 if (seg_ack == _snd.next + 1) {
1452 tcp_debug("ack: FIN_WAIT_1 -> FIN_WAIT_2\n");
1453 _state = FIN_WAIT_2;
1454 do_local_fin_acked();
1455 }
1456 }
1457 // FIN_WAIT_2 STATE
1458 if (in_state(FIN_WAIT_2)) {
1459 // In addition to the processing for the ESTABLISHED state, if
1460 // the retransmission queue is empty, the user’s CLOSE can be
1461 // acknowledged ("ok") but do not delete the TCB.
1462 // TODO
1463 }
1464 // CLOSING STATE
1465 if (in_state(CLOSING)) {
1466 if (seg_ack == _snd.next + 1) {
1467 tcp_debug("ack: CLOSING -> TIME_WAIT\n");
1468 do_local_fin_acked();
1469 return do_time_wait();
1470 } else {
1471 return;
1472 }
1473 }
1474 // LAST_ACK STATE
1475 if (in_state(LAST_ACK)) {
1476 if (seg_ack == _snd.next + 1) {
1477 tcp_debug("ack: LAST_ACK -> CLOSED\n");
1478 do_local_fin_acked();
1479 return do_closed();
1480 }
1481 }
1482 // TIME_WAIT STATE
1483 if (in_state(TIME_WAIT)) {
1484 // The only thing that can arrive in this state is a
1485 // retransmission of the remote FIN. Acknowledge it, and restart
1486 // the 2 MSL timeout.
1487 // TODO
1488 }
1489 }
1490
1491 // 4.6 sixth, check the URG bit
1492 if (th->f_urg) {
1493 // TODO
1494 }
1495
1496 // 4.7 seventh, process the segment text
1497 if (in_state(ESTABLISHED | FIN_WAIT_1 | FIN_WAIT_2)) {
1498 if (p.len()) {
1499 // Once the TCP takes responsibility for the data it advances
1500 // RCV.NXT over the data accepted, and adjusts RCV.WND as
1501 // apporopriate to the current buffer availability. The total of
1502 // RCV.NXT and RCV.WND should not be reduced.
1503 _rcv.data_size += p.len();
1504 _rcv.data.push_back(std::move(p));
1505 _rcv.next += seg_len;
1506 auto merged = merge_out_of_order();
1507 _rcv.window = get_modified_receive_window_size();
1508 signal_data_received();
1509 // Send an acknowledgment of the form:
1510 // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK>
1511 // This acknowledgment should be piggybacked on a segment being
1512 // transmitted if possible without incurring undue delay.
1513 if (merged) {
1514 // TCP receiver SHOULD send an immediate ACK when the
1515 // incoming segment fills in all or part of a gap in the
1516 // sequence space.
1517 do_output = true;
1518 } else {
1519 do_output = should_send_ack(seg_len);
1520 }
1521 }
1522 } else if (in_state(CLOSE_WAIT | CLOSING | LAST_ACK | TIME_WAIT)) {
1523 // This should not occur, since a FIN has been received from the
1524 // remote side. Ignore the segment text.
1525 return;
1526 }
1527
1528 // 4.8 eighth, check the FIN bit
1529 if (th->f_fin) {
1e59de90
TL
1530 if (_fin_recvd_promise) {
1531 _fin_recvd_promise->set_value();
1532 _fin_recvd_promise.reset();
1533 }
11fdf7f2
TL
1534 if (in_state(CLOSED | LISTEN | SYN_SENT)) {
1535 // Do not process the FIN if the state is CLOSED, LISTEN or SYN-SENT
1536 // since the SEG.SEQ cannot be validated; drop the segment and return.
1537 return;
1538 }
1539 auto fin_seq = seg_seq + seg_len;
1540 if (fin_seq == _rcv.next) {
1541 _rcv.next = fin_seq + 1;
1542 signal_data_received();
1543
1544 // If this <FIN> packet contains data as well, we can ACK both data
1545 // and <FIN> in a single packet, so canncel the previous ACK.
1546 clear_delayed_ack();
1547 do_output = false;
1548 // Send ACK for the FIN!
1549 output();
1550
1551 if (in_state(SYN_RECEIVED | ESTABLISHED)) {
1552 tcp_debug("fin: SYN_RECEIVED or ESTABLISHED -> CLOSE_WAIT\n");
1553 _state = CLOSE_WAIT;
1554 }
1555 if (in_state(FIN_WAIT_1)) {
1556 // If our FIN has been ACKed (perhaps in this segment), then
1557 // enter TIME-WAIT, start the time-wait timer, turn off the other
1558 // timers; otherwise enter the CLOSING state.
1559 // Note: If our FIN has been ACKed, we should be in FIN_WAIT_2
1560 // not FIN_WAIT_1 if we reach here.
1561 tcp_debug("fin: FIN_WAIT_1 -> CLOSING\n");
1562 _state = CLOSING;
1563 }
1564 if (in_state(FIN_WAIT_2)) {
1565 tcp_debug("fin: FIN_WAIT_2 -> TIME_WAIT\n");
1566 return do_time_wait();
1567 }
1568 }
1569 }
1570 if (do_output || (do_output_data && can_send())) {
1571 // Since we will do output, we can canncel scheduled delayed ACK.
1572 clear_delayed_ack();
1573 output();
1574 }
1575}
1576
1577template <typename InetTraits>
1578packet tcp<InetTraits>::tcb::get_transmit_packet() {
1579 // easy case: empty queue
1580 if (_snd.unsent.empty()) {
1581 return packet();
1582 }
1583 auto can_send = this->can_send();
1584 // Max number of TCP payloads we can pass to NIC
1585 uint32_t len;
1586 if (_tcp.hw_features().tx_tso) {
1587 // FIXME: Info tap device the size of the splitted packet
1588 len = _tcp.hw_features().max_packet_len - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
1589 } else {
1590 len = std::min(uint16_t(_tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
1591 }
1592 can_send = std::min(can_send, len);
1593 // easy case: one small packet
1594 if (_snd.unsent.size() == 1 && _snd.unsent.front().len() <= can_send) {
1595 auto p = std::move(_snd.unsent.front());
1596 _snd.unsent.pop_front();
1597 _snd.unsent_len -= p.len();
1598 return p;
1599 }
1600 // moderate case: need to split one packet
1601 if (_snd.unsent.front().len() > can_send) {
1602 auto p = _snd.unsent.front().share(0, can_send);
1603 _snd.unsent.front().trim_front(can_send);
1604 _snd.unsent_len -= p.len();
1605 return p;
1606 }
1607 // hard case: merge some packets, possibly split last
1608 auto p = std::move(_snd.unsent.front());
1609 _snd.unsent.pop_front();
1610 can_send -= p.len();
1611 while (!_snd.unsent.empty()
1612 && _snd.unsent.front().len() <= can_send) {
1613 can_send -= _snd.unsent.front().len();
1614 p.append(std::move(_snd.unsent.front()));
1615 _snd.unsent.pop_front();
1616 }
1617 if (!_snd.unsent.empty() && can_send) {
1618 auto& q = _snd.unsent.front();
1619 p.append(q.share(0, can_send));
1620 q.trim_front(can_send);
1621 }
1622 _snd.unsent_len -= p.len();
1623 return p;
1624}
1625
1626template <typename InetTraits>
1627void tcp<InetTraits>::tcb::output_one(bool data_retransmit) {
1628 if (in_state(CLOSED)) {
1629 return;
1630 }
1631
1632 packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet();
1633 packet clone = p.share(); // early clone to prevent share() from calling packet::unuse_internal_data() on header.
1634 uint16_t len = p.len();
1635 bool syn_on = syn_needs_on();
1636 bool ack_on = ack_needs_on();
1637
1638 auto options_size = _option.get_size(syn_on, ack_on);
1639 auto th = p.prepend_uninitialized_header(tcp_hdr::len + options_size);
1640 auto h = tcp_hdr{};
1641
1642 h.src_port = _local_port;
1643 h.dst_port = _foreign_port;
1644
1645 h.f_syn = syn_on;
1646 h.f_ack = ack_on;
1647 if (ack_on) {
1648 clear_delayed_ack();
1649 }
1650 h.f_urg = false;
1651 h.f_psh = false;
1652
1653 tcp_seq seq;
1654 if (data_retransmit) {
1655 seq = _snd.unacknowledged;
1656 } else {
1657 seq = syn_on ? _snd.initial : _snd.next;
1658 _snd.next += len;
1659 }
1660 h.seq = seq;
1661 h.ack = _rcv.next;
1662 h.data_offset = (tcp_hdr::len + options_size) / 4;
1663 h.window = _rcv.window >> _rcv.window_scale;
1664 h.checksum = 0;
1665
1666 // FIXME: does the FIN have to fit in the window?
1667 bool fin_on = fin_needs_on();
1668 h.f_fin = fin_on;
1669
1670 // Add tcp options
1671 _option.fill(th, &h, options_size);
1672 h.write(th);
1673
1674 offload_info oi;
1675 checksummer csum;
1676 uint16_t pseudo_hdr_seg_len = 0;
1677
1678 oi.tcp_hdr_len = tcp_hdr::len + options_size;
1679
1680 if (_tcp.hw_features().tx_csum_l4_offload) {
1681 oi.needs_csum = true;
1682
1683 //
1684 // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's
1685 // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones'
1686 // complement sum of the pseudo header.
1687 //
1688 // For TSO the csum should be calculated for a pseudo header with
1689 // segment length set to 0. All the rest is the same as for a TCP Tx
1690 // CSUM offload case.
1691 //
1692 if (_tcp.hw_features().tx_tso && len > _snd.mss) {
1693 oi.tso_seg_size = _snd.mss;
1694 } else {
1695 pseudo_hdr_seg_len = tcp_hdr::len + options_size + len;
1696 }
1697 } else {
1698 pseudo_hdr_seg_len = tcp_hdr::len + options_size + len;
1699 oi.needs_csum = false;
1700 }
1701
1702 InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip,
1703 pseudo_hdr_seg_len);
1704
1705 uint16_t checksum;
1706 if (_tcp.hw_features().tx_csum_l4_offload) {
1707 checksum = ~csum.get();
1708 } else {
1709 csum.sum(p);
1710 checksum = csum.get();
1711 }
1712 tcp_hdr::write_nbo_checksum(th, checksum);
1713
1714 oi.protocol = ip_protocol_num::tcp;
1715
1716 p.set_offload_info(oi);
1717
1718 if (!data_retransmit && (len || syn_on || fin_on)) {
1719 auto now = clock_type::now();
1720 if (len) {
1721 unsigned nr_transmits = 0;
1722 _snd.data.emplace_back(unacked_segment{std::move(clone),
1723 len, nr_transmits, now});
1724 }
1725 if (!_retransmit.armed()) {
1726 start_retransmit_timer(now);
1727 }
1728 }
1729
1730
1731 // if advertised TCP receive window is 0 we may only transmit zero window probing segment.
1732 // Payload size of this segment is 1. Queueing anything bigger when _snd.window == 0 is bug
1733 // and violation of RFC
1734 assert((_snd.window > 0) || ((_snd.window == 0) && (len <= 1)));
1735 queue_packet(std::move(p));
1736}
1737
1738template <typename InetTraits>
1739future<> tcp<InetTraits>::tcb::wait_for_data() {
1740 if (!_rcv.data.empty() || foreign_will_not_send()) {
1741 return make_ready_future<>();
1742 }
1743 _rcv._data_received_promise = promise<>();
1744 return _rcv._data_received_promise->get_future();
1745}
1746
1e59de90
TL
1747template <typename InetTraits>
1748future<> tcp<InetTraits>::tcb::wait_input_shutdown() {
1749 if (!_fin_recvd_promise) {
1750 return make_ready_future<>();
1751 }
1752 return _fin_recvd_promise->get_future();
1753}
1754
11fdf7f2
TL
1755template <typename InetTraits>
1756void
1e59de90 1757tcp<InetTraits>::tcb::abort_reader() noexcept {
11fdf7f2
TL
1758 if (_rcv._data_received_promise) {
1759 _rcv._data_received_promise->set_exception(
1760 std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
f67539c2 1761 _rcv._data_received_promise = std::nullopt;
11fdf7f2 1762 }
1e59de90
TL
1763 if (_fin_recvd_promise) {
1764 _fin_recvd_promise->set_value();
1765 _fin_recvd_promise.reset();
1766 }
11fdf7f2
TL
1767}
1768
1769template <typename InetTraits>
1770future<> tcp<InetTraits>::tcb::wait_for_all_data_acked() {
1771 if (_snd.data.empty() && _snd.unsent_len == 0) {
1772 return make_ready_future<>();
1773 }
1774 _snd._all_data_acked_promise = promise<>();
1775 return _snd._all_data_acked_promise->get_future();
1776}
1777
1778template <typename InetTraits>
1779void tcp<InetTraits>::tcb::connect() {
1780 // An initial send sequence number (ISS) is selected. A SYN segment of the
1781 // form <SEQ=ISS><CTL=SYN> is sent. Set SND.UNA to ISS, SND.NXT to ISS+1,
1782 // enter SYN-SENT state, and return.
1783 do_setup_isn();
1784
1785 // Local receive window scale factor
1786 _rcv.window_scale = _option._local_win_scale = 7;
1787 // Maximum segment size local can receive
1788 _rcv.mss = _option._local_mss = local_mss();
1789 _rcv.window = get_default_receive_window_size();
1790
1791 do_syn_sent();
1792}
1793
1794template <typename InetTraits>
1795packet tcp<InetTraits>::tcb::read() {
1796 packet p;
1797 for (auto&& q : _rcv.data) {
1798 p.append(std::move(q));
1799 }
1800 _rcv.data_size = 0;
1801 _rcv.data.clear();
1802 _rcv.window = get_default_receive_window_size();
1803 return p;
1804}
1805
1806template <typename InetTraits>
1807future<> tcp<InetTraits>::tcb::wait_send_available() {
1808 if (_snd.max_queue_space > _snd.current_queue_space) {
1809 return make_ready_future<>();
1810 }
1811 _snd._send_available_promise = promise<>();
1812 return _snd._send_available_promise->get_future();
1813}
1814
1815template <typename InetTraits>
1816future<> tcp<InetTraits>::tcb::send(packet p) {
1817 // We can not send after the connection is closed
1818 if (_snd.closed || in_state(CLOSED)) {
1819 return make_exception_future<>(tcp_reset_error());
1820 }
1821
1822 auto len = p.len();
1823 _snd.current_queue_space += len;
1824 _snd.unsent_len += len;
1825 _snd.unsent.push_back(std::move(p));
1826
1827 if (can_send() > 0) {
1828 output();
1829 }
1830
1831 return wait_send_available();
1832}
1833
1834template <typename InetTraits>
1e59de90 1835void tcp<InetTraits>::tcb::close() noexcept {
11fdf7f2
TL
1836 if (in_state(CLOSED) || _snd.closed) {
1837 return;
1838 }
1839 // TODO: We should return a future to upper layer
9f95a23c 1840 (void)wait_for_all_data_acked().then([this, zis = this->shared_from_this()] () mutable {
11fdf7f2
TL
1841 _snd.closed = true;
1842 tcp_debug("close: unsent_len=%d\n", _snd.unsent_len);
1843 if (in_state(CLOSE_WAIT)) {
1844 tcp_debug("close: CLOSE_WAIT -> LAST_ACK\n");
1845 _state = LAST_ACK;
1846 } else if (in_state(ESTABLISHED)) {
1847 tcp_debug("close: ESTABLISHED -> FIN_WAIT_1\n");
1848 _state = FIN_WAIT_1;
1849 }
1850 // Send <FIN> to remote
1851 // Note: we call output_one to make sure a packet with FIN actually
1852 // sent out. If we only call output() and _packetq is not empty,
1853 // tcp::tcb::get_packet(), packet with FIN will not be generated.
1854 output_one();
1855 output();
1856 });
1857}
1858
1859template <typename InetTraits>
1860bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) {
1861 // We've received a TSO packet, do ack immediately
1862 if (seg_len > _rcv.mss) {
1863 _nr_full_seg_received = 0;
1864 _delayed_ack.cancel();
1865 return true;
1866 }
1867
1868 // We've received a full sized segment, ack for every second full sized segment
1869 if (seg_len == _rcv.mss) {
1870 if (_nr_full_seg_received++ >= 1) {
1871 _nr_full_seg_received = 0;
1872 _delayed_ack.cancel();
1873 return true;
1874 }
1875 }
1876
1877 // If the timer is armed and its callback hasn't been run.
1878 if (_delayed_ack.armed()) {
1879 return false;
1880 }
1881
1882 // If the timer is not armed, schedule a delayed ACK.
1883 // The maximum delayed ack timer allowed by RFC1122 is 500ms, most
1884 // implementations use 200ms.
1885 _delayed_ack.arm(200ms);
1886 return false;
1887}
1888
1889template <typename InetTraits>
1e59de90 1890void tcp<InetTraits>::tcb::clear_delayed_ack() noexcept {
11fdf7f2
TL
1891 _delayed_ack.cancel();
1892}
1893
1894template <typename InetTraits>
1895bool tcp<InetTraits>::tcb::merge_out_of_order() {
1896 bool merged = false;
1897 if (_rcv.out_of_order.map.empty()) {
1898 return merged;
1899 }
1900 for (auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) {
1901 auto& p = it->second;
1902 auto seg_beg = it->first;
1903 auto seg_len = p.len();
1904 auto seg_end = seg_beg + seg_len;
1905 if (seg_beg <= _rcv.next && _rcv.next < seg_end) {
1906 // This segment has been received out of order and its previous
1907 // segment has been received now
1908 auto trim = _rcv.next - seg_beg;
1909 if (trim) {
1910 p.trim_front(trim);
1911 seg_len -= trim;
1912 }
1913 _rcv.next += seg_len;
11fdf7f2 1914 _rcv.data_size += p.len();
f67539c2 1915 _rcv.data.push_back(std::move(p));
11fdf7f2
TL
1916 // Since c++11, erase() always returns the value of the following element
1917 it = _rcv.out_of_order.map.erase(it);
1918 merged = true;
1919 } else if (_rcv.next >= seg_end) {
1920 // This segment has been receive already, drop it
1921 it = _rcv.out_of_order.map.erase(it);
1922 } else {
1923 // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only,
1924 // so we can stop looking here.
1925 it++;
1926 break;
1927 }
1928 }
1929 return merged;
1930}
1931
1932template <typename InetTraits>
1933void tcp<InetTraits>::tcb::insert_out_of_order(tcp_seq seg, packet p) {
1934 _rcv.out_of_order.merge(seg, std::move(p));
1935}
1936
1937template <typename InetTraits>
1938void tcp<InetTraits>::tcb::trim_receive_data_after_window() {
1939 abort();
1940}
1941
1942template <typename InetTraits>
1943void tcp<InetTraits>::tcb::persist() {
1944 tcp_debug("persist timer fired\n");
1945 // Send 1 byte packet to probe peer's window size
1946 _snd.window_probe = true;
1947 _snd.zero_window_probing_out++;
1948 output_one();
1949 _snd.window_probe = false;
1950
1951 output();
1952 // Perform binary exponential back-off per RFC1122
1953 _persist_time_out = std::min(_persist_time_out * 2, _rto_max);
1954 start_persist_timer();
1955}
1956
1957template <typename InetTraits>
1958void tcp<InetTraits>::tcb::retransmit() {
1959 auto output_update_rto = [this] {
1960 output();
1961 // According to RFC6298, Update RTO <- RTO * 2 to perform binary exponential back-off
1962 this->_rto = std::min(this->_rto * 2, this->_rto_max);
1963 start_retransmit_timer();
1964 };
1965
1966 // Retransmit SYN
1967 if (syn_needs_on()) {
1968 if (_snd.syn_retransmit++ < _max_nr_retransmit) {
1969 output_update_rto();
1970 } else {
1971 _connect_done.set_exception(tcp_connect_error());
1972 cleanup();
1973 return;
1974 }
1975 }
1976
1977 // Retransmit FIN
1978 if (fin_needs_on()) {
1979 if (_snd.fin_retransmit++ < _max_nr_retransmit) {
1980 output_update_rto();
1981 } else {
1982 cleanup();
1983 return;
1984 }
1985 }
1986
1987 // Retransmit Data
1988 if (_snd.data.empty()) {
1989 return;
1990 }
1991
1992 // If there are unacked data, retransmit the earliest segment
1993 auto& unacked_seg = _snd.data.front();
1994
1995 // According to RFC5681
1996 // Update ssthresh only for the first retransmit
1997 uint32_t smss = _snd.mss;
1998 if (unacked_seg.nr_transmits == 0) {
1999 _snd.ssthresh = std::max(flight_size() / 2, 2 * smss);
2000 }
2001 // RFC6582 Step 4
2002 _snd.recover = _snd.next - 1;
2003 // Start the slow start process
2004 _snd.cwnd = smss;
2005 // End fast recovery
2006 exit_fast_recovery();
2007
2008 if (unacked_seg.nr_transmits < _max_nr_retransmit) {
2009 unacked_seg.nr_transmits++;
2010 } else {
2011 // Delete connection when max num of retransmission is reached
f67539c2 2012 do_reset();
11fdf7f2
TL
2013 return;
2014 }
2015 retransmit_one();
2016
2017 output_update_rto();
2018}
2019
2020template <typename InetTraits>
2021void tcp<InetTraits>::tcb::fast_retransmit() {
2022 if (!_snd.data.empty()) {
2023 auto& unacked_seg = _snd.data.front();
2024 unacked_seg.nr_transmits++;
2025 retransmit_one();
2026 output();
2027 }
2028}
2029
2030template <typename InetTraits>
2031void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) {
2032 // Update RTO according to RFC6298
2033 auto R = std::chrono::duration_cast<std::chrono::milliseconds>(clock_type::now() - tx_time);
2034 if (_snd.first_rto_sample) {
2035 _snd.first_rto_sample = false;
2036 // RTTVAR <- R/2
2037 // SRTT <- R
2038 _snd.rttvar = R / 2;
2039 _snd.srtt = R;
2040 } else {
2041 // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
2042 // SRTT <- (1 - alpha) * SRTT + alpha * R'
2043 // where alpha = 1/8 and beta = 1/4
2044 auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt);
2045 _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4;
2046 _snd.srtt = _snd.srtt * 7 / 8 + R / 8;
2047 }
2048 // RTO <- SRTT + max(G, K * RTTVAR)
2049 _rto = _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar);
2050
2051 // Make sure 1 sec << _rto << 60 sec
2052 _rto = std::max(_rto, _rto_min);
2053 _rto = std::min(_rto, _rto_max);
2054}
2055
2056template <typename InetTraits>
2057void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) {
2058 uint32_t smss = _snd.mss;
2059 if (_snd.cwnd < _snd.ssthresh) {
2060 // In slow start phase
2061 _snd.cwnd += std::min(acked_bytes, smss);
2062 } else {
2063 // In congestion avoidance phase
2064 uint32_t round_up = 1;
2065 _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd);
2066 }
2067}
2068
2069template <typename InetTraits>
2070void tcp<InetTraits>::tcb::cleanup() {
2071 _snd.unsent.clear();
2072 _snd.data.clear();
2073 _rcv.out_of_order.map.clear();
2074 _rcv.data_size = 0;
2075 _rcv.data.clear();
2076 stop_retransmit_timer();
2077 clear_delayed_ack();
2078 remove_from_tcbs();
2079}
2080
2081template <typename InetTraits>
2082tcp_seq tcp<InetTraits>::tcb::get_isn() {
2083 // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers
2084 // with the expression:
2085 // ISN = M + F(localip, localport, remoteip, remoteport, secretkey)
2086 // M is the 4 microsecond timer
2087 using namespace std::chrono;
2088 uint32_t hash[4];
2089 hash[0] = _local_ip.ip;
2090 hash[1] = _foreign_ip.ip;
2091 hash[2] = (_local_port << 16) + _foreign_port;
2092 hash[3] = _isn_secret.key[15];
2093 CryptoPP::Weak::MD5::Transform(hash, _isn_secret.key);
2094 auto seq = hash[0];
2095 auto m = duration_cast<microseconds>(clock_type::now().time_since_epoch());
2096 seq += m.count() / 4;
2097 return make_seq(seq);
2098}
2099
2100template <typename InetTraits>
f67539c2 2101std::optional<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
11fdf7f2
TL
2102 _poll_active = false;
2103 if (_packetq.empty()) {
2104 output_one();
2105 }
2106
2107 if (in_state(CLOSED)) {
f67539c2 2108 return std::optional<typename InetTraits::l4packet>();
11fdf7f2
TL
2109 }
2110
2111 assert(!_packetq.empty());
2112
2113 auto p = std::move(_packetq.front());
2114 _packetq.pop_front();
2115 if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0 && (_snd.window > 0))) {
2116 // If there are packets to send in the queue or tcb is allowed to send
2117 // more add tcp back to polling set to keep sending. In addition, dupacks >= 3
2118 // is an indication that an segment is lost, stop sending more in this case.
2119 // Finally - we can't send more until window is opened again.
2120 output();
2121 }
f67539c2 2122 return p;
11fdf7f2
TL
2123}
2124
2125template <typename InetTraits>
1e59de90 2126void tcp<InetTraits>::connection::close_read() noexcept {
11fdf7f2
TL
2127 _tcb->abort_reader();
2128}
2129
2130template <typename InetTraits>
1e59de90 2131void tcp<InetTraits>::connection::close_write() noexcept {
11fdf7f2
TL
2132 _tcb->close();
2133}
2134
2135template <typename InetTraits>
2136void tcp<InetTraits>::connection::shutdown_connect() {
2137 if (_tcb->syn_needs_on()) {
2138 _tcb->_connect_done.set_exception(tcp_refused_error());
2139 _tcb->cleanup();
2140 } else {
2141 close_read();
2142 close_write();
2143 }
2144}
2145
2146template <typename InetTraits>
2147constexpr uint16_t tcp<InetTraits>::tcb::_max_nr_retransmit;
2148
2149template <typename InetTraits>
2150constexpr std::chrono::milliseconds tcp<InetTraits>::tcb::_rto_min;
2151
2152template <typename InetTraits>
2153constexpr std::chrono::milliseconds tcp<InetTraits>::tcb::_rto_max;
2154
2155template <typename InetTraits>
2156constexpr std::chrono::milliseconds tcp<InetTraits>::tcb::_rto_clk_granularity;
2157
2158template <typename InetTraits>
2159typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret;
2160
2161}
2162
2163}