]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/TCP.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / dpdk / TCP.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
22
23 #ifndef CEPH_DPDK_TCP_H_
24 #define CEPH_DPDK_TCP_H_
25
26 #include <unordered_map>
27 #include <map>
28 #include <queue>
29 #include <functional>
30 #include <deque>
31 #include <chrono>
32 #include <stdexcept>
33 #include <system_error>
34
35 #include "msg/async/dpdk/EventDPDK.h"
36
37 #include "include/utime.h"
38 #include "common/Throttle.h"
39 #include "common/ceph_time.h"
40 #include "common/ceph_crypto.h"
41 #include "msg/async/Event.h"
42 #include "IPChecksum.h"
43 #include "IP.h"
44 #include "const.h"
45 #include "byteorder.h"
46 #include "shared_ptr.h"
47 #include "PacketUtil.h"
48
49 #include "include/random.h"
50
51 struct tcp_hdr;
52
53 enum class tcp_state : uint16_t {
54 CLOSED = (1 << 0),
55 LISTEN = (1 << 1),
56 SYN_SENT = (1 << 2),
57 SYN_RECEIVED = (1 << 3),
58 ESTABLISHED = (1 << 4),
59 FIN_WAIT_1 = (1 << 5),
60 FIN_WAIT_2 = (1 << 6),
61 CLOSE_WAIT = (1 << 7),
62 CLOSING = (1 << 8),
63 LAST_ACK = (1 << 9),
64 TIME_WAIT = (1 << 10)
65 };
66
67 inline tcp_state operator|(tcp_state s1, tcp_state s2) {
68 return tcp_state(uint16_t(s1) | uint16_t(s2));
69 }
70
71 inline std::ostream & operator<<(std::ostream & str, const tcp_state& s) {
72 switch (s) {
73 case tcp_state::CLOSED: return str << "CLOSED";
74 case tcp_state::LISTEN: return str << "LISTEN";
75 case tcp_state::SYN_SENT: return str << "SYN_SENT";
76 case tcp_state::SYN_RECEIVED: return str << "SYN_RECEIVED";
77 case tcp_state::ESTABLISHED: return str << "ESTABLISHED";
78 case tcp_state::FIN_WAIT_1: return str << "FIN_WAIT_1";
79 case tcp_state::FIN_WAIT_2: return str << "FIN_WAIT_2";
80 case tcp_state::CLOSE_WAIT: return str << "CLOSE_WAIT";
81 case tcp_state::CLOSING: return str << "CLOSING";
82 case tcp_state::LAST_ACK: return str << "LAST_ACK";
83 case tcp_state::TIME_WAIT: return str << "TIME_WAIT";
84 default: return str << "UNKNOWN";
85 }
86 }
87
88 struct tcp_option {
89 // The kind and len field are fixed and defined in TCP protocol
90 enum class option_kind: uint8_t { mss = 2, win_scale = 3, sack = 4, timestamps = 8, nop = 1, eol = 0 };
91 enum class option_len: uint8_t { mss = 4, win_scale = 3, sack = 2, timestamps = 10, nop = 1, eol = 1 };
92 struct mss {
93 option_kind kind = option_kind::mss;
94 option_len len = option_len::mss;
95 uint16_t mss;
96 struct mss hton() {
97 struct mss m = *this;
98 m.mss = ::hton(m.mss);
99 return m;
100 }
101 } __attribute__((packed));
102 struct win_scale {
103 option_kind kind = option_kind::win_scale;
104 option_len len = option_len::win_scale;
105 uint8_t shift;
106 } __attribute__((packed));
107 struct sack {
108 option_kind kind = option_kind::sack;
109 option_len len = option_len::sack;
110 } __attribute__((packed));
111 struct timestamps {
112 option_kind kind = option_kind::timestamps;
113 option_len len = option_len::timestamps;
114 uint32_t t1;
115 uint32_t t2;
116 } __attribute__((packed));
117 struct nop {
118 option_kind kind = option_kind::nop;
119 } __attribute__((packed));
120 struct eol {
121 option_kind kind = option_kind::eol;
122 } __attribute__((packed));
123 static const uint8_t align = 4;
124
125 void parse(uint8_t* beg, uint8_t* end);
126 uint8_t fill(tcp_hdr* th, uint8_t option_size);
127 uint8_t get_size(bool syn_on, bool ack_on);
128
129 // For option negotiattion
130 bool _mss_received = false;
131 bool _win_scale_received = false;
132 bool _timestamps_received = false;
133 bool _sack_received = false;
134
135 // Option data
136 uint16_t _remote_mss = 536;
137 uint16_t _local_mss;
138 uint8_t _remote_win_scale = 0;
139 uint8_t _local_win_scale = 0;
140 };
141 inline uint8_t*& operator+=(uint8_t*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
142 inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
143
144 struct tcp_sequence {
145 uint32_t raw;
146 };
147
148 tcp_sequence ntoh(tcp_sequence ts) {
149 return tcp_sequence { ::ntoh(ts.raw) };
150 }
151
152 tcp_sequence hton(tcp_sequence ts) {
153 return tcp_sequence { ::hton(ts.raw) };
154 }
155
156 inline std::ostream& operator<<(std::ostream& os, const tcp_sequence& s) {
157 return os << s.raw;
158 }
159
160 inline tcp_sequence make_seq(uint32_t raw) { return tcp_sequence{raw}; }
161 inline tcp_sequence& operator+=(tcp_sequence& s, int32_t n) { s.raw += n; return s; }
162 inline tcp_sequence& operator-=(tcp_sequence& s, int32_t n) { s.raw -= n; return s; }
163 inline tcp_sequence operator+(tcp_sequence s, int32_t n) { return s += n; }
164 inline tcp_sequence operator-(tcp_sequence s, int32_t n) { return s -= n; }
165 inline int32_t operator-(tcp_sequence s, tcp_sequence q) { return s.raw - q.raw; }
166 inline bool operator==(tcp_sequence s, tcp_sequence q) { return s.raw == q.raw; }
167 inline bool operator!=(tcp_sequence s, tcp_sequence q) { return !(s == q); }
168 inline bool operator<(tcp_sequence s, tcp_sequence q) { return s - q < 0; }
169 inline bool operator>(tcp_sequence s, tcp_sequence q) { return q < s; }
170 inline bool operator<=(tcp_sequence s, tcp_sequence q) { return !(s > q); }
171 inline bool operator>=(tcp_sequence s, tcp_sequence q) { return !(s < q); }
172
173 struct tcp_hdr {
174 uint16_t src_port;
175 uint16_t dst_port;
176 tcp_sequence seq;
177 tcp_sequence ack;
178 uint8_t rsvd1 : 4;
179 uint8_t data_offset : 4;
180 uint8_t f_fin : 1;
181 uint8_t f_syn : 1;
182 uint8_t f_rst : 1;
183 uint8_t f_psh : 1;
184 uint8_t f_ack : 1;
185 uint8_t f_urg : 1;
186 uint8_t rsvd2 : 2;
187 uint16_t window;
188 uint16_t checksum;
189 uint16_t urgent;
190
191 tcp_hdr hton() {
192 tcp_hdr hdr = *this;
193 hdr.src_port = ::hton(src_port);
194 hdr.dst_port = ::hton(dst_port);
195 hdr.seq = ::hton(seq);
196 hdr.ack = ::hton(ack);
197 hdr.window = ::hton(window);
198 hdr.checksum = ::hton(checksum);
199 hdr.urgent = ::hton(urgent);
200 return hdr;
201 }
202
203 tcp_hdr ntoh() {
204 tcp_hdr hdr = *this;
205 hdr.src_port = ::ntoh(src_port);
206 hdr.dst_port = ::ntoh(dst_port);
207 hdr.seq = ::ntoh(seq);
208 hdr.ack = ::ntoh(ack);
209 hdr.window = ::ntoh(window);
210 hdr.checksum = ::ntoh(checksum);
211 hdr.urgent = ::ntoh(urgent);
212 return hdr;
213 }
214 } __attribute__((packed));
215
216 struct tcp_tag {};
217 using tcp_packet_merger = packet_merger<tcp_sequence, tcp_tag>;
218
219 template <typename InetTraits>
220 class tcp {
221 public:
222 using ipaddr = typename InetTraits::address_type;
223 using inet_type = typename InetTraits::inet_type;
224 using connid = l4connid<InetTraits>;
225 using connid_hash = typename connid::connid_hash;
226 class connection;
227 class listener;
228 private:
229 class tcb;
230
231 class C_handle_delayed_ack : public EventCallback {
232 tcb *tc;
233
234 public:
235 C_handle_delayed_ack(tcb *t): tc(t) { }
236 void do_request(uint64_t r) {
237 tc->_delayed_ack_fd.reset();
238 tc->_nr_full_seg_received = 0;
239 tc->output();
240 }
241 };
242
243 class C_handle_retransmit : public EventCallback {
244 tcb *tc;
245
246 public:
247 C_handle_retransmit(tcb *t): tc(t) { }
248 void do_request(uint64_t r) {
249 tc->retransmit_fd.reset();
250 tc->retransmit();
251 }
252 };
253
254 class C_handle_persist : public EventCallback {
255 tcb *tc;
256
257 public:
258 C_handle_persist(tcb *t): tc(t) { }
259 void do_request(uint64_t r) {
260 tc->persist_fd.reset();
261 tc->persist();
262 }
263 };
264
265 class C_all_data_acked : public EventCallback {
266 tcb *tc;
267
268 public:
269 C_all_data_acked(tcb *t): tc(t) {}
270 void do_request(uint64_t fd_or_id) {
271 tc->close_final_cleanup();
272 }
273 };
274
275 class C_actual_remove_tcb : public EventCallback {
276 lw_shared_ptr<tcb> tc;
277 public:
278 C_actual_remove_tcb(tcb *t): tc(t->shared_from_this()) {}
279 void do_request(uint64_t r) {
280 delete this;
281 }
282 };
283
284 class tcb : public enable_lw_shared_from_this<tcb> {
285 using clock_type = ceph::coarse_real_clock;
286 static constexpr tcp_state CLOSED = tcp_state::CLOSED;
287 static constexpr tcp_state LISTEN = tcp_state::LISTEN;
288 static constexpr tcp_state SYN_SENT = tcp_state::SYN_SENT;
289 static constexpr tcp_state SYN_RECEIVED = tcp_state::SYN_RECEIVED;
290 static constexpr tcp_state ESTABLISHED = tcp_state::ESTABLISHED;
291 static constexpr tcp_state FIN_WAIT_1 = tcp_state::FIN_WAIT_1;
292 static constexpr tcp_state FIN_WAIT_2 = tcp_state::FIN_WAIT_2;
293 static constexpr tcp_state CLOSE_WAIT = tcp_state::CLOSE_WAIT;
294 static constexpr tcp_state CLOSING = tcp_state::CLOSING;
295 static constexpr tcp_state LAST_ACK = tcp_state::LAST_ACK;
296 static constexpr tcp_state TIME_WAIT = tcp_state::TIME_WAIT;
297 tcp_state _state = CLOSED;
298 tcp& _tcp;
299 UserspaceEventManager &manager;
300 connection* _conn = nullptr;
301 bool _connect_done = false;
302 ipaddr _local_ip;
303 ipaddr _foreign_ip;
304 uint16_t _local_port;
305 uint16_t _foreign_port;
306 struct unacked_segment {
307 Packet p;
308 uint16_t data_len;
309 unsigned nr_transmits;
310 clock_type::time_point tx_time;
311 };
312 struct send {
313 tcp_sequence unacknowledged;
314 tcp_sequence next;
315 uint32_t window;
316 uint8_t window_scale;
317 uint16_t mss;
318 tcp_sequence urgent;
319 tcp_sequence wl1;
320 tcp_sequence wl2;
321 tcp_sequence initial;
322 std::deque<unacked_segment> data;
323 std::deque<Packet> unsent;
324 uint32_t unsent_len = 0;
325 uint32_t queued_len = 0;
326 bool closed = false;
327 // Wait for all data are acked
328 int _all_data_acked_fd = -1;
329 // Limit number of data queued into send queue
330 Throttle user_queue_space;
331 // Round-trip time variation
332 std::chrono::microseconds rttvar;
333 // Smoothed round-trip time
334 std::chrono::microseconds srtt;
335 bool first_rto_sample = true;
336 clock_type::time_point syn_tx_time;
337 // Congestion window
338 uint32_t cwnd;
339 // Slow start threshold
340 uint32_t ssthresh;
341 // Duplicated ACKs
342 uint16_t dupacks = 0;
343 unsigned syn_retransmit = 0;
344 unsigned fin_retransmit = 0;
345 uint32_t limited_transfer = 0;
346 uint32_t partial_ack = 0;
347 tcp_sequence recover;
348 bool window_probe = false;
349 send(CephContext *c): user_queue_space(c, "DPDK::tcp::tcb::user_queue_space", 81920) {}
350 } _snd;
351 struct receive {
352 tcp_sequence next;
353 uint32_t window;
354 uint8_t window_scale;
355 uint16_t mss;
356 tcp_sequence urgent;
357 tcp_sequence initial;
358 std::deque<Packet> data;
359 tcp_packet_merger out_of_order;
360 } _rcv;
361 EventCenter *center;
362 int fd;
363 // positive means no errno, 0 means eof, nagetive means error
364 int16_t _errno = 1;
365 tcp_option _option;
366 EventCallbackRef delayed_ack_event;
367 std::optional<uint64_t> _delayed_ack_fd;
368 // Retransmission timeout
369 std::chrono::microseconds _rto{1000*1000};
370 std::chrono::microseconds _persist_time_out{1000*1000};
371 static constexpr std::chrono::microseconds _rto_min{1000*1000};
372 static constexpr std::chrono::microseconds _rto_max{60000*1000};
373 // Clock granularity
374 static constexpr std::chrono::microseconds _rto_clk_granularity{1000};
375 static constexpr uint16_t _max_nr_retransmit{5};
376 EventCallbackRef retransmit_event;
377 std::optional<uint64_t> retransmit_fd;
378 EventCallbackRef persist_event;
379 EventCallbackRef all_data_ack_event;
380 std::optional<uint64_t> persist_fd;
381 uint16_t _nr_full_seg_received = 0;
382 struct isn_secret {
383 // 512 bits secretkey for ISN generating
384 uint32_t key[16];
385 isn_secret () {
386 for (auto& k : key) {
387 k = ceph::util::generate_random_number<uint32_t>(0, std::numeric_limits<uint32_t>::max());
388 }
389 }
390 };
391 static isn_secret _isn_secret;
392 tcp_sequence get_isn();
393 circular_buffer<typename InetTraits::l4packet> _packetq;
394 bool _poll_active = false;
395 public:
396 // callback
397 void close_final_cleanup();
398 std::ostream& _prefix(std::ostream *_dout);
399
400 public:
401 tcb(tcp& t, connid id);
402 ~tcb();
403 void input_handle_listen_state(tcp_hdr* th, Packet p);
404 void input_handle_syn_sent_state(tcp_hdr* th, Packet p);
405 void input_handle_other_state(tcp_hdr* th, Packet p);
406 void output_one(bool data_retransmit = false);
407 bool is_all_data_acked();
408 int send(Packet p);
409 void connect();
410 std::optional<Packet> read();
411 void close();
412 void remove_from_tcbs() {
413 auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
414 _tcp._tcbs.erase(id);
415 }
416 std::optional<typename InetTraits::l4packet> get_packet();
417 void output() {
418 if (!_poll_active) {
419 _poll_active = true;
420
421 auto tcb = this->shared_from_this();
422 _tcp._inet.wait_l2_dst_address(_foreign_ip, Packet(), [tcb] (const ethernet_address &dst, Packet p, int r) {
423 if (r == 0) {
424 tcb->_tcp.poll_tcb(dst, std::move(tcb));
425 } else if (r == -ETIMEDOUT) {
426 // in other states connection should time out
427 if (tcb->in_state(SYN_SENT)) {
428 tcb->_errno = -ETIMEDOUT;
429 tcb->cleanup();
430 }
431 } else if (r == -EBUSY) {
432 // retry later
433 tcb->_poll_active = false;
434 tcb->start_retransmit_timer();
435 }
436 });
437 }
438 }
439
440 int16_t get_errno() const {
441 return _errno;
442 }
443
444 tcp_state& state() {
445 return _state;
446 }
447
448 uint64_t peek_sent_available() {
449 if (!in_state(ESTABLISHED))
450 return 0;
451 uint64_t left = _snd.user_queue_space.get_max() - _snd.user_queue_space.get_current();
452 return left;
453 }
454
455 int is_connected() const {
456 if (_errno <= 0)
457 return _errno;
458 return _connect_done;
459 }
460
461 private:
462 void respond_with_reset(tcp_hdr* th);
463 bool merge_out_of_order();
464 void insert_out_of_order(tcp_sequence seq, Packet p);
465 void trim_receive_data_after_window();
466 bool should_send_ack(uint16_t seg_len);
467 void clear_delayed_ack();
468 Packet get_transmit_packet();
469 void retransmit_one() {
470 bool data_retransmit = true;
471 output_one(data_retransmit);
472 }
473 void start_retransmit_timer() {
474 if (retransmit_fd)
475 center->delete_time_event(*retransmit_fd);
476 retransmit_fd.emplace(center->create_time_event(_rto.count(), retransmit_event));
477 };
478 void stop_retransmit_timer() {
479 if (retransmit_fd) {
480 center->delete_time_event(*retransmit_fd);
481 retransmit_fd.reset();
482 }
483 };
484 void start_persist_timer() {
485 if (persist_fd)
486 center->delete_time_event(*persist_fd);
487 persist_fd.emplace(center->create_time_event(_persist_time_out.count(), persist_event));
488 };
489 void stop_persist_timer() {
490 if (persist_fd) {
491 center->delete_time_event(*persist_fd);
492 persist_fd.reset();
493 }
494 };
495 void persist();
496 void retransmit();
497 void fast_retransmit();
498 void update_rto(clock_type::time_point tx_time);
499 void update_cwnd(uint32_t acked_bytes);
500 void cleanup();
501 uint32_t can_send() {
502 if (_snd.window_probe) {
503 return 1;
504 }
505 // Can not send more than advertised window allows
506 auto x = std::min(uint32_t(_snd.unacknowledged + _snd.window - _snd.next), _snd.unsent_len);
507 // Can not send more than congestion window allows
508 x = std::min(_snd.cwnd, x);
509 if (_snd.dupacks == 1 || _snd.dupacks == 2) {
510 // RFC5681 Step 3.1
511 // Send cwnd + 2 * smss per RFC3042
512 auto flight = flight_size();
513 auto max = _snd.cwnd + 2 * _snd.mss;
514 x = flight <= max ? std::min(x, max - flight) : 0;
515 _snd.limited_transfer += x;
516 } else if (_snd.dupacks >= 3) {
517 // RFC5681 Step 3.5
518 // Sent 1 full-sized segment at most
519 x = std::min(uint32_t(_snd.mss), x);
520 }
521 return x;
522 }
523 uint32_t flight_size() {
524 uint32_t size = 0;
525 std::for_each(_snd.data.begin(), _snd.data.end(),
526 [&] (unacked_segment& seg) { size += seg.p.len(); });
527 return size;
528 }
529 uint16_t local_mss() {
530 return _tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
531 }
532 void queue_packet(Packet p) {
533 _packetq.emplace_back(
534 typename InetTraits::l4packet{_foreign_ip, std::move(p)});
535 }
536 void signal_data_received() {
537 manager.notify(fd, EVENT_READABLE);
538 }
539 void signal_all_data_acked() {
540 if (_snd._all_data_acked_fd >= 0 && _snd.unsent_len == 0 && _snd.queued_len == 0)
541 manager.notify(_snd._all_data_acked_fd, EVENT_READABLE);
542 }
543 void do_syn_sent() {
544 _state = SYN_SENT;
545 _snd.syn_tx_time = clock_type::now();
546 // Send <SYN> to remote
547 output();
548 }
549 void do_syn_received() {
550 _state = SYN_RECEIVED;
551 _snd.syn_tx_time = clock_type::now();
552 // Send <SYN,ACK> to remote
553 output();
554 }
555 void do_established() {
556 _state = ESTABLISHED;
557 update_rto(_snd.syn_tx_time);
558 _connect_done = true;
559 manager.notify(fd, EVENT_READABLE|EVENT_WRITABLE);
560 }
561 void do_reset() {
562 _state = CLOSED;
563 // Free packets to be sent which are waiting for user_queue_space
564 _snd.user_queue_space.reset();
565 cleanup();
566 _errno = -ECONNRESET;
567 manager.notify(fd, EVENT_READABLE);
568
569 if (_snd._all_data_acked_fd >= 0)
570 manager.notify(_snd._all_data_acked_fd, EVENT_READABLE);
571 }
572 void do_time_wait() {
573 // FIXME: Implement TIME_WAIT state timer
574 _state = TIME_WAIT;
575 cleanup();
576 }
577 void do_closed() {
578 _state = CLOSED;
579 cleanup();
580 }
581 void do_setup_isn() {
582 _snd.initial = get_isn();
583 _snd.unacknowledged = _snd.initial;
584 _snd.next = _snd.initial + 1;
585 _snd.recover = _snd.initial;
586 }
587 void do_local_fin_acked() {
588 _snd.unacknowledged += 1;
589 _snd.next += 1;
590 }
591 bool syn_needs_on() {
592 return in_state(SYN_SENT | SYN_RECEIVED);
593 }
594 bool fin_needs_on() {
595 return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed &&
596 _snd.unsent_len == 0 && _snd.queued_len == 0;
597 }
598 bool ack_needs_on() {
599 return !in_state(CLOSED | LISTEN | SYN_SENT);
600 }
601 bool foreign_will_not_send() {
602 return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED);
603 }
604 bool in_state(tcp_state state) {
605 return uint16_t(_state) & uint16_t(state);
606 }
607 void exit_fast_recovery() {
608 _snd.dupacks = 0;
609 _snd.limited_transfer = 0;
610 _snd.partial_ack = 0;
611 }
612 uint32_t data_segment_acked(tcp_sequence seg_ack);
613 bool segment_acceptable(tcp_sequence seg_seq, unsigned seg_len);
614 void init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end);
615 friend class connection;
616
617 friend class C_handle_delayed_ack;
618 friend class C_handle_retransmit;
619 friend class C_handle_persist;
620 friend class C_all_data_acked;
621 };
622
623 CephContext *cct;
624 // ipv4_l4<ip_protocol_num::tcp>
625 inet_type& _inet;
626 EventCenter *center;
627 UserspaceEventManager &manager;
628 std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs;
629 std::unordered_map<uint16_t, listener*> _listening;
630 std::random_device _rd;
631 std::default_random_engine _e;
632 std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
633 circular_buffer<std::pair<lw_shared_ptr<tcb>, ethernet_address>> _poll_tcbs;
634 // queue for packets that do not belong to any tcb
635 circular_buffer<ipv4_traits::l4packet> _packetq;
636 Throttle _queue_space;
637 // Limit number of data queued into send queue
638 public:
639 class connection {
640 lw_shared_ptr<tcb> _tcb;
641 public:
642 explicit connection(lw_shared_ptr<tcb> tcbp) : _tcb(std::move(tcbp)) { _tcb->_conn = this; }
643 connection(const connection&) = delete;
644 connection(connection&& x) noexcept : _tcb(std::move(x._tcb)) {
645 _tcb->_conn = this;
646 }
647 ~connection();
648 void operator=(const connection&) = delete;
649 connection& operator=(connection&& x) {
650 if (this != &x) {
651 this->~connection();
652 new (this) connection(std::move(x));
653 }
654 return *this;
655 }
656 int fd() const {
657 return _tcb->fd;
658 }
659 int send(Packet p) {
660 return _tcb->send(std::move(p));
661 }
662 std::optional<Packet> read() {
663 return _tcb->read();
664 }
665 int16_t get_errno() const {
666 return _tcb->get_errno();
667 }
668 void close_read();
669 void close_write();
670 entity_addr_t remote_addr() const {
671 entity_addr_t addr;
672 auto net_ip = _tcb->_foreign_ip.hton();
673 memcpy((void*)&addr.in4_addr().sin_addr.s_addr,
674 &net_ip, sizeof(addr.in4_addr().sin_addr.s_addr));
675 addr.set_family(AF_INET);
676 return addr;
677 }
678 uint64_t peek_sent_available() {
679 return _tcb->peek_sent_available();
680 }
681 int is_connected() const { return _tcb->is_connected(); }
682 };
683 class listener {
684 tcp& _tcp;
685 uint16_t _port;
686 int _fd = -1;
687 int16_t _errno;
688 std::queue<connection> _q;
689 size_t _q_max_length;
690
691 private:
692 listener(tcp& t, uint16_t port, size_t queue_length)
693 : _tcp(t), _port(port), _errno(0), _q(), _q_max_length(queue_length) {
694 }
695 public:
696 listener(const listener&) = delete;
697 void operator=(const listener&) = delete;
698 listener(listener&& x)
699 : _tcp(x._tcp), _port(x._port), _fd(std::move(x._fd)), _errno(x._errno),
700 _q(std::move(x._q)) {
701 if (_fd >= 0)
702 _tcp._listening[_port] = this;
703 }
704 ~listener() {
705 abort_accept();
706 }
707 int listen() {
708 if (_tcp._listening.find(_port) != _tcp._listening.end())
709 return -EADDRINUSE;
710 _tcp._listening.emplace(_port, this);
711 _fd = _tcp.manager.get_eventfd();
712 return 0;
713 }
714 std::optional<connection> accept() {
715 std::optional<connection> c;
716 if (!_q.empty()) {
717 c = std::move(_q.front());
718 _q.pop();
719 }
720 return c;
721 }
722 void abort_accept() {
723 while (!_q.empty())
724 _q.pop();
725 if (_fd >= 0) {
726 _tcp._listening.erase(_port);
727 _tcp.manager.close(_fd);
728 _fd = -1;
729 }
730 }
731 int16_t get_errno() const {
732 return _errno;
733 }
734 bool full() const {
735 return _q.size() == _q_max_length;
736 }
737 int fd() const {
738 return _fd;
739 }
740 friend class tcp;
741 };
742 public:
743 explicit tcp(CephContext *c, inet_type& inet, EventCenter *cen);
744 void received(Packet p, ipaddr from, ipaddr to);
745 bool forward(forward_hash& out_hash_data, Packet& p, size_t off);
746 listener listen(uint16_t port, size_t queue_length = 100);
747 connection connect(const entity_addr_t &addr);
748 const hw_features& get_hw_features() const { return _inet._inet.get_hw_features(); }
749 void poll_tcb(const ethernet_address &dst, lw_shared_ptr<tcb> tcb) {
750 _poll_tcbs.emplace_back(std::move(tcb), dst);
751 }
752 bool push_listen_queue(uint16_t port, tcb *t) {
753 auto listener = _listening.find(port);
754 if (listener == _listening.end() || listener->second->full()) {
755 return false;
756 }
757 listener->second->_q.push(connection(t->shared_from_this()));
758 manager.notify(listener->second->_fd, EVENT_READABLE);
759 return true;
760 }
761
762 private:
763 void send_packet_without_tcb(ipaddr from, ipaddr to, Packet p);
764 void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
765 friend class listener;
766 };
767
768 template <typename InetTraits>
769 tcp<InetTraits>::tcp(CephContext *c, inet_type& inet, EventCenter *cen)
770 : cct(c), _inet(inet), center(cen),
771 manager(static_cast<DPDKDriver*>(cen->get_driver())->manager),
772 _e(_rd()), _queue_space(cct, "DPDK::tcp::queue_space", 81920) {
773 int tcb_polled = 0u;
774 _inet.register_packet_provider([this, tcb_polled] () mutable {
775 std::optional<typename InetTraits::l4packet> l4p;
776 auto c = _poll_tcbs.size();
777 if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
778 l4p = std::move(_packetq.front());
779 _packetq.pop_front();
780 _queue_space.put(l4p->p.len());
781 } else {
782 while (c--) {
783 tcb_polled++;
784 lw_shared_ptr<tcb> tcb;
785 ethernet_address dst;
786 std::tie(tcb, dst) = std::move(_poll_tcbs.front());
787 _poll_tcbs.pop_front();
788 l4p = std::move(tcb->get_packet());
789 if (l4p) {
790 l4p->e_dst = dst;
791 break;
792 }
793 }
794 }
795 return l4p;
796 });
797 }
798
799 template <typename InetTraits>
800 auto tcp<InetTraits>::listen(uint16_t port, size_t queue_length) -> listener {
801 return listener(*this, port, queue_length);
802 }
803
804 template <typename InetTraits>
805 typename tcp<InetTraits>::connection tcp<InetTraits>::connect(const entity_addr_t &addr) {
806 uint16_t src_port;
807 connid id;
808 auto src_ip = _inet._inet.host_address();
809 auto dst_ip = ipv4_address(addr);
810 auto dst_port = addr.get_port();
811
812 do {
813 src_port = _port_dist(_e);
814 id = connid{src_ip, dst_ip, src_port, (uint16_t)dst_port};
815 if (_tcbs.find(id) == _tcbs.end()) {
816 if (_inet._inet.netif()->hw_queues_count() == 1 ||
817 _inet._inet.netif()->hash2cpu(
818 id.hash(_inet._inet.netif()->rss_key())) == center->get_id())
819 break;
820 }
821 } while (true);
822
823 auto tcbp = make_lw_shared<tcb>(*this, id);
824 _tcbs.insert({id, tcbp});
825 tcbp->connect();
826 return connection(tcbp);
827 }
828
829 template <typename InetTraits>
830 bool tcp<InetTraits>::forward(forward_hash& out_hash_data, Packet& p, size_t off) {
831 auto th = p.get_header<tcp_hdr>(off);
832 if (th) {
833 out_hash_data.push_back(th->src_port);
834 out_hash_data.push_back(th->dst_port);
835 }
836 return true;
837 }
838
839 template <typename InetTraits>
840 void tcp<InetTraits>::received(Packet p, ipaddr from, ipaddr to) {
841 auto th = p.get_header<tcp_hdr>(0);
842 if (!th) {
843 return;
844 }
845 // th->data_offset is correct even before ntoh()
846 if (unsigned(th->data_offset * 4) < sizeof(*th)) {
847 return;
848 }
849
850 if (!get_hw_features().rx_csum_offload) {
851 checksummer csum;
852 InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
853 csum.sum(p);
854 if (csum.get() != 0) {
855 return;
856 }
857 }
858 auto h = th->ntoh();
859 auto id = connid{to, from, h.dst_port, h.src_port};
860 auto tcbi = _tcbs.find(id);
861 lw_shared_ptr<tcb> tcbp;
862 if (tcbi == _tcbs.end()) {
863 auto listener = _listening.find(id.local_port);
864 if (listener == _listening.end() || listener->second->full()) {
865 // 1) In CLOSE state
866 // 1.1 all data in the incoming segment is discarded. An incoming
867 // segment containing a RST is discarded. An incoming segment not
868 // containing a RST causes a RST to be sent in response.
869 // FIXME:
870 // if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK>
871 // if ACK on: <SEQ=SEG.ACK><CTL=RST>
872 return respond_with_reset(&h, id.local_ip, id.foreign_ip);
873 } else {
874 // 2) In LISTEN state
875 // 2.1 first check for an RST
876 if (h.f_rst) {
877 // An incoming RST should be ignored
878 return;
879 }
880 // 2.2 second check for an ACK
881 if (h.f_ack) {
882 // Any acknowledgment is bad if it arrives on a connection
883 // still in the LISTEN state.
884 // <SEQ=SEG.ACK><CTL=RST>
885 return respond_with_reset(&h, id.local_ip, id.foreign_ip);
886 }
887 // 2.3 third check for a SYN
888 if (h.f_syn) {
889 // check the security
890 // NOTE: Ignored for now
891 tcbp = make_lw_shared<tcb>(*this, id);
892 _tcbs.insert({id, tcbp});
893 return tcbp->input_handle_listen_state(&h, std::move(p));
894 }
895 // 2.4 fourth other text or control
896 // So you are unlikely to get here, but if you do, drop the
897 // segment, and return.
898 return;
899 }
900 } else {
901 tcbp = tcbi->second;
902 if (tcbp->state() == tcp_state::SYN_SENT) {
903 // 3) In SYN_SENT State
904 return tcbp->input_handle_syn_sent_state(&h, std::move(p));
905 } else {
906 // 4) In other state, can be one of the following:
907 // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
908 // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
909 return tcbp->input_handle_other_state(&h, std::move(p));
910 }
911 }
912 }
913
914 // Send packet does not belong to any tcb
915 template <typename InetTraits>
916 void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, Packet p) {
917 if (_queue_space.get_or_fail(p.len())) { // drop packets that do not fit the queue
918 _inet.wait_l2_dst_address(to, std::move(p), [this, to] (const ethernet_address &e_dst, Packet p, int r) mutable {
919 if (r == 0)
920 _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
921 });
922 }
923 }
924
925 template <typename InetTraits>
926 tcp<InetTraits>::connection::~connection() {
927 if (_tcb) {
928 _tcb->_conn = nullptr;
929 close_read();
930 close_write();
931 }
932 }
933
934 template <typename InetTraits>
935 tcp<InetTraits>::tcb::tcb(tcp& t, connid id)
936 : _tcp(t), manager(t.manager), _local_ip(id.local_ip) , _foreign_ip(id.foreign_ip),
937 _local_port(id.local_port), _foreign_port(id.foreign_port),
938 _snd(_tcp.cct),
939 center(t.center),
940 fd(t.manager.get_eventfd()),
941 delayed_ack_event(new tcp<InetTraits>::C_handle_delayed_ack(this)),
942 retransmit_event(new tcp<InetTraits>::C_handle_retransmit(this)),
943 persist_event(new tcp<InetTraits>::C_handle_persist(this)),
944 all_data_ack_event(new tcp<InetTraits>::C_all_data_acked(this)) {}
945
946 template <typename InetTraits>
947 tcp<InetTraits>::tcb::~tcb()
948 {
949 if (_delayed_ack_fd)
950 center->delete_time_event(*_delayed_ack_fd);
951 if (retransmit_fd)
952 center->delete_time_event(*retransmit_fd);
953 if (persist_fd)
954 center->delete_time_event(*persist_fd);
955 delete delayed_ack_event;
956 delete retransmit_event;
957 delete persist_event;
958 delete all_data_ack_event;
959 manager.close(fd);
960 fd = -1;
961 }
962
963 template <typename InetTraits>
964 void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth)
965 {
966 _tcp.respond_with_reset(rth, _local_ip, _foreign_ip);
967 }
968
969 template <typename InetTraits>
970 uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_sequence seg_ack) {
971 uint32_t total_acked_bytes = 0;
972 // Full ACK of segment
973 while (!_snd.data.empty()
974 && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) {
975 auto acked_bytes = _snd.data.front().p.len();
976 _snd.unacknowledged += acked_bytes;
977 // Ignore retransmitted segments when setting the RTO
978 if (_snd.data.front().nr_transmits == 0) {
979 update_rto(_snd.data.front().tx_time);
980 }
981 update_cwnd(acked_bytes);
982 total_acked_bytes += acked_bytes;
983 _snd.user_queue_space.put(_snd.data.front().data_len);
984 manager.notify(fd, EVENT_WRITABLE);
985 _snd.data.pop_front();
986 }
987 // Partial ACK of segment
988 if (_snd.unacknowledged < seg_ack) {
989 auto acked_bytes = seg_ack - _snd.unacknowledged;
990 if (!_snd.data.empty()) {
991 auto& unacked_seg = _snd.data.front();
992 unacked_seg.p.trim_front(acked_bytes);
993 }
994 _snd.unacknowledged = seg_ack;
995 update_cwnd(acked_bytes);
996 total_acked_bytes += acked_bytes;
997 }
998 return total_acked_bytes;
999 }
1000
1001 template <typename InetTraits>
1002 bool tcp<InetTraits>::tcb::segment_acceptable(tcp_sequence seg_seq, unsigned seg_len) {
1003 if (seg_len == 0 && _rcv.window == 0) {
1004 // SEG.SEQ = RCV.NXT
1005 return seg_seq == _rcv.next;
1006 } else if (seg_len == 0 && _rcv.window > 0) {
1007 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1008 return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window);
1009 } else if (seg_len > 0 && _rcv.window > 0) {
1010 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1011 // or
1012 // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
1013 bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window);
1014 bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window);
1015 return x || y;
1016 } else {
1017 // SEG.LEN > 0 RCV.WND = 0, not acceptable
1018 return false;
1019 }
1020 }
1021
1022 template <typename InetTraits>
1023 void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) {
1024 // Handle tcp options
1025 _option.parse(opt_start, opt_end);
1026
1027 // Remote receive window scale factor
1028 _snd.window_scale = _option._remote_win_scale;
1029 // Local receive window scale factor
1030 _rcv.window_scale = _option._local_win_scale;
1031
1032 // Maximum segment size remote can receive
1033 _snd.mss = _option._remote_mss;
1034 // Maximum segment size local can receive
1035 _rcv.mss = _option._local_mss = local_mss();
1036
1037 // Linux's default window size
1038 _rcv.window = 29200 << _rcv.window_scale;
1039 _snd.window = th->window << _snd.window_scale;
1040
1041 // Segment sequence number used for last window update
1042 _snd.wl1 = th->seq;
1043 // Segment acknowledgment number used for last window update
1044 _snd.wl2 = th->ack;
1045
1046 // Setup initial congestion window
1047 if (2190 < _snd.mss) {
1048 _snd.cwnd = 2 * _snd.mss;
1049 } else if (1095 < _snd.mss && _snd.mss <= 2190) {
1050 _snd.cwnd = 3 * _snd.mss;
1051 } else {
1052 _snd.cwnd = 4 * _snd.mss;
1053 }
1054
1055 // Setup initial slow start threshold
1056 _snd.ssthresh = th->window << _snd.window_scale;
1057 }
1058
1059 template <typename InetTraits>
1060 Packet tcp<InetTraits>::tcb::get_transmit_packet() {
1061 // easy case: empty queue
1062 if (_snd.unsent.empty()) {
1063 return Packet();
1064 }
1065 auto can_send = this->can_send();
1066 // Max number of TCP payloads we can pass to NIC
1067 uint32_t len;
1068 if (_tcp.get_hw_features().tx_tso) {
1069 // FIXME: Info tap device the size of the split packet
1070 len = _tcp.get_hw_features().max_packet_len - tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
1071 } else {
1072 len = std::min(uint16_t(_tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
1073 }
1074 can_send = std::min(can_send, len);
1075 // easy case: one small packet
1076 if (_snd.unsent.front().len() <= can_send) {
1077 auto p = std::move(_snd.unsent.front());
1078 _snd.unsent.pop_front();
1079 _snd.unsent_len -= p.len();
1080 return p;
1081 }
1082 // moderate case: need to split one packet
1083 if (_snd.unsent.front().len() > can_send) {
1084 auto p = _snd.unsent.front().share(0, can_send);
1085 _snd.unsent.front().trim_front(can_send);
1086 _snd.unsent_len -= p.len();
1087 return p;
1088 }
1089 // hard case: merge some packets, possibly split last
1090 auto p = std::move(_snd.unsent.front());
1091 _snd.unsent.pop_front();
1092 can_send -= p.len();
1093 while (!_snd.unsent.empty()
1094 && _snd.unsent.front().len() <= can_send) {
1095 can_send -= _snd.unsent.front().len();
1096 p.append(std::move(_snd.unsent.front()));
1097 _snd.unsent.pop_front();
1098 }
1099 // FIXME: this will result in calling "deleter" of packet which free managed objects
1100 // will used later
1101 // if (!_snd.unsent.empty() && can_send) {
1102 // auto& q = _snd.unsent.front();
1103 // p.append(q.share(0, can_send));
1104 // q.trim_front(can_send);
1105 // }
1106 _snd.unsent_len -= p.len();
1107 return p;
1108 }
1109
1110 template <typename InetTraits>
1111 void tcp<InetTraits>::tcb::output_one(bool data_retransmit) {
1112 if (in_state(CLOSED)) {
1113 return;
1114 }
1115
1116 Packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet();
1117 Packet clone = p.share(); // early clone to prevent share() from calling packet::unuse_internal_data() on header.
1118 uint16_t len = p.len();
1119 bool syn_on = syn_needs_on();
1120 bool ack_on = ack_needs_on();
1121
1122 auto options_size = _option.get_size(syn_on, ack_on);
1123 auto th = p.prepend_header<tcp_hdr>(options_size);
1124
1125 th->src_port = _local_port;
1126 th->dst_port = _foreign_port;
1127
1128 th->f_syn = syn_on;
1129 th->f_ack = ack_on;
1130 if (ack_on) {
1131 clear_delayed_ack();
1132 }
1133 th->f_urg = false;
1134 th->f_psh = false;
1135
1136 tcp_sequence seq;
1137 if (data_retransmit) {
1138 seq = _snd.unacknowledged;
1139 } else {
1140 seq = syn_on ? _snd.initial : _snd.next;
1141 _snd.next += len;
1142 }
1143 th->seq = seq;
1144 th->ack = _rcv.next;
1145 th->data_offset = (sizeof(*th) + options_size) / 4;
1146 th->window = _rcv.window >> _rcv.window_scale;
1147 th->checksum = 0;
1148
1149 // FIXME: does the FIN have to fit in the window?
1150 bool fin_on = fin_needs_on();
1151 th->f_fin = fin_on;
1152
1153 // Add tcp options
1154 _option.fill(th, options_size);
1155 *th = th->hton();
1156
1157 offload_info oi;
1158 checksummer csum;
1159 uint16_t pseudo_hdr_seg_len = 0;
1160
1161 oi.tcp_hdr_len = sizeof(tcp_hdr) + options_size;
1162
1163 if (_tcp.get_hw_features().tx_csum_l4_offload) {
1164 oi.needs_csum = true;
1165
1166 //
1167 // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's
1168 // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones'
1169 // complement sum of the pseudo header.
1170 //
1171 // For TSO the csum should be calculated for a pseudo header with
1172 // segment length set to 0. All the rest is the same as for a TCP Tx
1173 // CSUM offload case.
1174 //
1175 if (_tcp.get_hw_features().tx_tso && len > _snd.mss) {
1176 oi.tso_seg_size = _snd.mss;
1177 } else {
1178 pseudo_hdr_seg_len = sizeof(*th) + options_size + len;
1179 }
1180 } else {
1181 pseudo_hdr_seg_len = sizeof(*th) + options_size + len;
1182 oi.needs_csum = false;
1183 }
1184
1185 InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip,
1186 pseudo_hdr_seg_len);
1187
1188 if (_tcp.get_hw_features().tx_csum_l4_offload) {
1189 th->checksum = ~csum.get();
1190 } else {
1191 csum.sum(p);
1192 th->checksum = csum.get();
1193 }
1194
1195 oi.protocol = ip_protocol_num::tcp;
1196
1197 p.set_offload_info(oi);
1198
1199 if (!data_retransmit && (len || syn_on || fin_on)) {
1200 auto now = clock_type::now();
1201 if (len) {
1202 unsigned nr_transmits = 0;
1203 _snd.data.emplace_back(unacked_segment{std::move(clone),
1204 len, nr_transmits, now});
1205 }
1206 if (!retransmit_fd) {
1207 start_retransmit_timer();
1208 }
1209 }
1210
1211 queue_packet(std::move(p));
1212 }
1213
1214 template <typename InetTraits>
1215 bool tcp<InetTraits>::tcb::is_all_data_acked() {
1216 if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) {
1217 return true;
1218 }
1219 return false;
1220 }
1221
1222 template <typename InetTraits>
1223 std::optional<Packet> tcp<InetTraits>::tcb::read() {
1224 std::optional<Packet> p;
1225 if (_rcv.data.empty())
1226 return p;
1227
1228 p.emplace();
1229 for (auto&& q : _rcv.data) {
1230 p->append(std::move(q));
1231 }
1232 _rcv.data.clear();
1233 return p;
1234 }
1235
1236 template <typename InetTraits>
1237 int tcp<InetTraits>::tcb::send(Packet p) {
1238 // We can not send after the connection is closed
1239 ceph_assert(!_snd.closed);
1240
1241 if (in_state(CLOSED))
1242 return -ECONNRESET;
1243
1244 auto len = p.len();
1245 if (!_snd.user_queue_space.get_or_fail(len)) {
1246 // note: caller must ensure enough queue space to send
1247 ceph_abort();
1248 }
1249 // TODO: Handle p.len() > max user_queue_space case
1250 _snd.queued_len += len;
1251 _snd.unsent_len += len;
1252 _snd.queued_len -= len;
1253 _snd.unsent.push_back(std::move(p));
1254 if (can_send() > 0) {
1255 output();
1256 }
1257 return len;
1258 }
1259
1260 template <typename InetTraits>
1261 void tcp<InetTraits>::tcb::close() {
1262 if (in_state(CLOSED) || _snd.closed) {
1263 return ;
1264 }
1265 // TODO: We should make this asynchronous
1266
1267 _errno = -EPIPE;
1268 center->delete_file_event(fd, EVENT_READABLE|EVENT_WRITABLE);
1269 bool acked = is_all_data_acked();
1270 if (!acked) {
1271 _snd._all_data_acked_fd = manager.get_eventfd();
1272 center->create_file_event(_snd._all_data_acked_fd, EVENT_READABLE, all_data_ack_event);
1273 } else {
1274 close_final_cleanup();
1275 }
1276 }
1277
1278 template <typename InetTraits>
1279 bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) {
1280 // We've received a TSO packet, do ack immediately
1281 if (seg_len > _rcv.mss) {
1282 _nr_full_seg_received = 0;
1283 if (_delayed_ack_fd) {
1284 center->delete_time_event(*_delayed_ack_fd);
1285 _delayed_ack_fd.reset();
1286 }
1287 return true;
1288 }
1289
1290 // We've received a full sized segment, ack for every second full sized segment
1291 if (seg_len == _rcv.mss) {
1292 if (_nr_full_seg_received++ >= 1) {
1293 _nr_full_seg_received = 0;
1294 if (_delayed_ack_fd) {
1295 center->delete_time_event(*_delayed_ack_fd);
1296 _delayed_ack_fd.reset();
1297 }
1298 return true;
1299 }
1300 }
1301
1302 // If the timer is armed and its callback hasn't been run.
1303 if (_delayed_ack_fd) {
1304 return false;
1305 }
1306
1307 // If the timer is not armed, schedule a delayed ACK.
1308 // The maximum delayed ack timer allowed by RFC1122 is 500ms, most
1309 // implementations use 200ms.
1310 _delayed_ack_fd.emplace(center->create_time_event(200*1000, delayed_ack_event));
1311 return false;
1312 }
1313
1314 template <typename InetTraits>
1315 void tcp<InetTraits>::tcb::clear_delayed_ack() {
1316 if (_delayed_ack_fd) {
1317 center->delete_time_event(*_delayed_ack_fd);
1318 _delayed_ack_fd.reset();
1319 }
1320 }
1321
1322 template <typename InetTraits>
1323 bool tcp<InetTraits>::tcb::merge_out_of_order() {
1324 bool merged = false;
1325 if (_rcv.out_of_order.map.empty()) {
1326 return merged;
1327 }
1328 for (auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) {
1329 auto& p = it->second;
1330 auto seg_beg = it->first;
1331 auto seg_len = p.len();
1332 auto seg_end = seg_beg + seg_len;
1333 if (seg_beg <= _rcv.next && seg_end > _rcv.next) {
1334 // This segment has been received out of order and its previous
1335 // segment has been received now
1336 auto trim = _rcv.next - seg_beg;
1337 if (trim) {
1338 p.trim_front(trim);
1339 seg_len -= trim;
1340 }
1341 _rcv.next += seg_len;
1342 _rcv.data.push_back(std::move(p));
1343 // Since c++11, erase() always returns the value of the following element
1344 it = _rcv.out_of_order.map.erase(it);
1345 merged = true;
1346 } else if (_rcv.next >= seg_end) {
1347 // This segment has been receive already, drop it
1348 it = _rcv.out_of_order.map.erase(it);
1349 } else {
1350 // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only,
1351 // so we can stop looking here.
1352 it++;
1353 break;
1354 }
1355 }
1356 return merged;
1357 }
1358
1359 template <typename InetTraits>
1360 void tcp<InetTraits>::tcb::insert_out_of_order(tcp_sequence seg, Packet p) {
1361 _rcv.out_of_order.merge(seg, std::move(p));
1362 }
1363
1364 template <typename InetTraits>
1365 void tcp<InetTraits>::tcb::trim_receive_data_after_window() {
1366 abort();
1367 }
1368
1369 template <typename InetTraits>
1370 void tcp<InetTraits>::tcb::fast_retransmit() {
1371 if (!_snd.data.empty()) {
1372 auto& unacked_seg = _snd.data.front();
1373 unacked_seg.nr_transmits++;
1374 retransmit_one();
1375 output();
1376 }
1377 }
1378
1379 template <typename InetTraits>
1380 void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) {
1381 // Update RTO according to RFC6298
1382 auto R = std::chrono::duration_cast<std::chrono::microseconds>(clock_type::now() - tx_time);
1383 if (_snd.first_rto_sample) {
1384 _snd.first_rto_sample = false;
1385 // RTTVAR <- R/2
1386 // SRTT <- R
1387 _snd.rttvar = R / 2;
1388 _snd.srtt = R;
1389 } else {
1390 // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
1391 // SRTT <- (1 - alpha) * SRTT + alpha * R'
1392 // where alpha = 1/8 and beta = 1/4
1393 auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt);
1394 _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4;
1395 _snd.srtt = _snd.srtt * 7 / 8 + R / 8;
1396 }
1397 // RTO <- SRTT + max(G, K * RTTVAR)
1398 _rto = _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar);
1399
1400 // Make sure 1 sec << _rto << 60 sec
1401 _rto = std::max(_rto, _rto_min);
1402 _rto = std::min(_rto, _rto_max);
1403 }
1404
1405 template <typename InetTraits>
1406 void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) {
1407 uint32_t smss = _snd.mss;
1408 if (_snd.cwnd < _snd.ssthresh) {
1409 // In slow start phase
1410 _snd.cwnd += std::min(acked_bytes, smss);
1411 } else {
1412 // In congestion avoidance phase
1413 uint32_t round_up = 1;
1414 _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd);
1415 }
1416 }
1417
1418
1419 template <typename InetTraits>
1420 void tcp<InetTraits>::tcb::cleanup() {
1421 manager.notify(fd, EVENT_READABLE);
1422 _snd.closed = true;
1423 _snd.unsent.clear();
1424 _snd.data.clear();
1425 _rcv.out_of_order.map.clear();
1426 _rcv.data.clear();
1427 stop_retransmit_timer();
1428 clear_delayed_ack();
1429 center->dispatch_event_external(new tcp<InetTraits>::C_actual_remove_tcb(this));
1430 remove_from_tcbs();
1431 }
1432
1433 template <typename InetTraits>
1434 tcp_sequence tcp<InetTraits>::tcb::get_isn() {
1435 // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers
1436 // with the expression:
1437 // ISN = M + F(localip, localport, remoteip, remoteport, secretkey)
1438 // M is the 4 microsecond timer
1439 using namespace std::chrono;
1440 uint32_t hash[4];
1441 hash[0] = _local_ip.ip;
1442 hash[1] = _foreign_ip.ip;
1443 hash[2] = (_local_port << 16) + _foreign_port;
1444 hash[3] = _isn_secret.key[15];
1445 ceph::crypto::MD5 md5;
1446 md5.Update((const unsigned char*)_isn_secret.key, sizeof(_isn_secret.key));
1447 md5.Final((unsigned char*)hash);
1448 auto seq = hash[0];
1449 auto m = duration_cast<microseconds>(clock_type::now().time_since_epoch());
1450 seq += m.count() / 4;
1451 return make_seq(seq);
1452 }
1453
1454 template <typename InetTraits>
1455 std::optional<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
1456 _poll_active = false;
1457 if (_packetq.empty()) {
1458 output_one();
1459 }
1460
1461 std::optional<typename InetTraits::l4packet> p;
1462 if (in_state(CLOSED)) {
1463 return p;
1464 }
1465
1466 ceph_assert(!_packetq.empty());
1467
1468 p = std::move(_packetq.front());
1469 _packetq.pop_front();
1470 if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0)) {
1471 // If there are packets to send in the queue or tcb is allowed to send
1472 // more add tcp back to polling set to keep sending. In addition, dupacks >= 3
1473 // is an indication that an segment is lost, stop sending more in this case.
1474 output();
1475 }
1476 return p;
1477 }
1478
1479 template <typename InetTraits>
1480 void tcp<InetTraits>::connection::close_read() {
1481 // do nothing
1482 // _tcb->manager.notify(_tcb->fd, EVENT_READABLE);
1483 }
1484
1485 template <typename InetTraits>
1486 void tcp<InetTraits>::connection::close_write() {
1487 _tcb->close();
1488 }
1489
1490 template <typename InetTraits>
1491 constexpr uint16_t tcp<InetTraits>::tcb::_max_nr_retransmit;
1492
1493 template <typename InetTraits>
1494 constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_min;
1495
1496 template <typename InetTraits>
1497 constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_max;
1498
1499 template <typename InetTraits>
1500 constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_clk_granularity;
1501
1502 template <typename InetTraits>
1503 typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret;
1504
1505
1506 #endif /* TCP_HH_ */