1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 #ifndef CEPH_DPDK_TCP_H_
24 #define CEPH_DPDK_TCP_H_
26 #include <unordered_map>
33 #include <system_error>
35 #include "msg/async/dpdk/EventDPDK.h"
37 #include "include/utime.h"
38 #include "common/Throttle.h"
39 #include "common/ceph_time.h"
40 #include "common/ceph_crypto.h"
41 #include "msg/async/Event.h"
42 #include "IPChecksum.h"
45 #include "byteorder.h"
46 #include "shared_ptr.h"
47 #include "PacketUtil.h"
49 #include "include/random.h"
53 enum class tcp_state
: uint16_t {
57 SYN_RECEIVED
= (1 << 3),
58 ESTABLISHED
= (1 << 4),
59 FIN_WAIT_1
= (1 << 5),
60 FIN_WAIT_2
= (1 << 6),
61 CLOSE_WAIT
= (1 << 7),
67 inline tcp_state
operator|(tcp_state s1
, tcp_state s2
) {
68 return tcp_state(uint16_t(s1
) | uint16_t(s2
));
71 inline std::ostream
& operator<<(std::ostream
& str
, const tcp_state
& s
) {
73 case tcp_state::CLOSED
: return str
<< "CLOSED";
74 case tcp_state::LISTEN
: return str
<< "LISTEN";
75 case tcp_state::SYN_SENT
: return str
<< "SYN_SENT";
76 case tcp_state::SYN_RECEIVED
: return str
<< "SYN_RECEIVED";
77 case tcp_state::ESTABLISHED
: return str
<< "ESTABLISHED";
78 case tcp_state::FIN_WAIT_1
: return str
<< "FIN_WAIT_1";
79 case tcp_state::FIN_WAIT_2
: return str
<< "FIN_WAIT_2";
80 case tcp_state::CLOSE_WAIT
: return str
<< "CLOSE_WAIT";
81 case tcp_state::CLOSING
: return str
<< "CLOSING";
82 case tcp_state::LAST_ACK
: return str
<< "LAST_ACK";
83 case tcp_state::TIME_WAIT
: return str
<< "TIME_WAIT";
84 default: return str
<< "UNKNOWN";
89 // The kind and len field are fixed and defined in TCP protocol
90 enum class option_kind
: uint8_t { mss
= 2, win_scale
= 3, sack
= 4, timestamps
= 8, nop
= 1, eol
= 0 };
91 enum class option_len
: uint8_t { mss
= 4, win_scale
= 3, sack
= 2, timestamps
= 10, nop
= 1, eol
= 1 };
93 option_kind kind
= option_kind::mss
;
94 option_len len
= option_len::mss
;
98 m
.mss
= ::hton(m
.mss
);
101 } __attribute__((packed
));
103 option_kind kind
= option_kind::win_scale
;
104 option_len len
= option_len::win_scale
;
106 } __attribute__((packed
));
108 option_kind kind
= option_kind::sack
;
109 option_len len
= option_len::sack
;
110 } __attribute__((packed
));
112 option_kind kind
= option_kind::timestamps
;
113 option_len len
= option_len::timestamps
;
116 } __attribute__((packed
));
118 option_kind kind
= option_kind::nop
;
119 } __attribute__((packed
));
121 option_kind kind
= option_kind::eol
;
122 } __attribute__((packed
));
123 static const uint8_t align
= 4;
125 void parse(uint8_t* beg
, uint8_t* end
);
126 uint8_t fill(tcp_hdr
* th
, uint8_t option_size
);
127 uint8_t get_size(bool syn_on
, bool ack_on
);
129 // For option negotiattion
130 bool _mss_received
= false;
131 bool _win_scale_received
= false;
132 bool _timestamps_received
= false;
133 bool _sack_received
= false;
136 uint16_t _remote_mss
= 536;
138 uint8_t _remote_win_scale
= 0;
139 uint8_t _local_win_scale
= 0;
141 inline uint8_t*& operator+=(uint8_t*& x
, tcp_option::option_len len
) { x
+= uint8_t(len
); return x
; }
142 inline uint8_t& operator+=(uint8_t& x
, tcp_option::option_len len
) { x
+= uint8_t(len
); return x
; }
144 struct tcp_sequence
{
148 tcp_sequence
ntoh(tcp_sequence ts
) {
149 return tcp_sequence
{ ::ntoh(ts
.raw
) };
152 tcp_sequence
hton(tcp_sequence ts
) {
153 return tcp_sequence
{ ::hton(ts
.raw
) };
156 inline std::ostream
& operator<<(std::ostream
& os
, const tcp_sequence
& s
) {
160 inline tcp_sequence
make_seq(uint32_t raw
) { return tcp_sequence
{raw
}; }
161 inline tcp_sequence
& operator+=(tcp_sequence
& s
, int32_t n
) { s
.raw
+= n
; return s
; }
162 inline tcp_sequence
& operator-=(tcp_sequence
& s
, int32_t n
) { s
.raw
-= n
; return s
; }
163 inline tcp_sequence
operator+(tcp_sequence s
, int32_t n
) { return s
+= n
; }
164 inline tcp_sequence
operator-(tcp_sequence s
, int32_t n
) { return s
-= n
; }
165 inline int32_t operator-(tcp_sequence s
, tcp_sequence q
) { return s
.raw
- q
.raw
; }
166 inline bool operator==(tcp_sequence s
, tcp_sequence q
) { return s
.raw
== q
.raw
; }
167 inline bool operator!=(tcp_sequence s
, tcp_sequence q
) { return !(s
== q
); }
168 inline bool operator<(tcp_sequence s
, tcp_sequence q
) { return s
- q
< 0; }
169 inline bool operator>(tcp_sequence s
, tcp_sequence q
) { return q
< s
; }
170 inline bool operator<=(tcp_sequence s
, tcp_sequence q
) { return !(s
> q
); }
171 inline bool operator>=(tcp_sequence s
, tcp_sequence q
) { return !(s
< q
); }
179 uint8_t data_offset
: 4;
193 hdr
.src_port
= ::hton(src_port
);
194 hdr
.dst_port
= ::hton(dst_port
);
195 hdr
.seq
= ::hton(seq
);
196 hdr
.ack
= ::hton(ack
);
197 hdr
.window
= ::hton(window
);
198 hdr
.checksum
= ::hton(checksum
);
199 hdr
.urgent
= ::hton(urgent
);
205 hdr
.src_port
= ::ntoh(src_port
);
206 hdr
.dst_port
= ::ntoh(dst_port
);
207 hdr
.seq
= ::ntoh(seq
);
208 hdr
.ack
= ::ntoh(ack
);
209 hdr
.window
= ::ntoh(window
);
210 hdr
.checksum
= ::ntoh(checksum
);
211 hdr
.urgent
= ::ntoh(urgent
);
214 } __attribute__((packed
));
217 using tcp_packet_merger
= packet_merger
<tcp_sequence
, tcp_tag
>;
219 template <typename InetTraits
>
222 using ipaddr
= typename
InetTraits::address_type
;
223 using inet_type
= typename
InetTraits::inet_type
;
224 using connid
= l4connid
<InetTraits
>;
225 using connid_hash
= typename
connid::connid_hash
;
231 class C_handle_delayed_ack
: public EventCallback
{
235 C_handle_delayed_ack(tcb
*t
): tc(t
) { }
236 void do_request(uint64_t r
) {
237 tc
->_delayed_ack_fd
.destroy();
238 tc
->_nr_full_seg_received
= 0;
243 class C_handle_retransmit
: public EventCallback
{
247 C_handle_retransmit(tcb
*t
): tc(t
) { }
248 void do_request(uint64_t r
) {
249 tc
->retransmit_fd
.destroy();
254 class C_handle_persist
: public EventCallback
{
258 C_handle_persist(tcb
*t
): tc(t
) { }
259 void do_request(uint64_t r
) {
260 tc
->persist_fd
.destroy();
265 class C_all_data_acked
: public EventCallback
{
269 C_all_data_acked(tcb
*t
): tc(t
) {}
270 void do_request(uint64_t fd_or_id
) {
271 tc
->close_final_cleanup();
275 class C_actual_remove_tcb
: public EventCallback
{
276 lw_shared_ptr
<tcb
> tc
;
278 C_actual_remove_tcb(tcb
*t
): tc(t
->shared_from_this()) {}
279 void do_request(uint64_t r
) {
284 class tcb
: public enable_lw_shared_from_this
<tcb
> {
285 using clock_type
= ceph::coarse_real_clock
;
286 static constexpr tcp_state CLOSED
= tcp_state::CLOSED
;
287 static constexpr tcp_state LISTEN
= tcp_state::LISTEN
;
288 static constexpr tcp_state SYN_SENT
= tcp_state::SYN_SENT
;
289 static constexpr tcp_state SYN_RECEIVED
= tcp_state::SYN_RECEIVED
;
290 static constexpr tcp_state ESTABLISHED
= tcp_state::ESTABLISHED
;
291 static constexpr tcp_state FIN_WAIT_1
= tcp_state::FIN_WAIT_1
;
292 static constexpr tcp_state FIN_WAIT_2
= tcp_state::FIN_WAIT_2
;
293 static constexpr tcp_state CLOSE_WAIT
= tcp_state::CLOSE_WAIT
;
294 static constexpr tcp_state CLOSING
= tcp_state::CLOSING
;
295 static constexpr tcp_state LAST_ACK
= tcp_state::LAST_ACK
;
296 static constexpr tcp_state TIME_WAIT
= tcp_state::TIME_WAIT
;
297 tcp_state _state
= CLOSED
;
299 UserspaceEventManager
&manager
;
300 connection
* _conn
= nullptr;
301 bool _connect_done
= false;
304 uint16_t _local_port
;
305 uint16_t _foreign_port
;
306 struct unacked_segment
{
309 unsigned nr_transmits
;
310 clock_type::time_point tx_time
;
313 tcp_sequence unacknowledged
;
316 uint8_t window_scale
;
321 tcp_sequence initial
;
322 std::deque
<unacked_segment
> data
;
323 std::deque
<Packet
> unsent
;
324 uint32_t unsent_len
= 0;
325 uint32_t queued_len
= 0;
327 // Wait for all data are acked
328 int _all_data_acked_fd
= -1;
329 // Limit number of data queued into send queue
330 Throttle user_queue_space
;
331 // Round-trip time variation
332 std::chrono::microseconds rttvar
;
333 // Smoothed round-trip time
334 std::chrono::microseconds srtt
;
335 bool first_rto_sample
= true;
336 clock_type::time_point syn_tx_time
;
339 // Slow start threshold
342 uint16_t dupacks
= 0;
343 unsigned syn_retransmit
= 0;
344 unsigned fin_retransmit
= 0;
345 uint32_t limited_transfer
= 0;
346 uint32_t partial_ack
= 0;
347 tcp_sequence recover
;
348 bool window_probe
= false;
349 send(CephContext
*c
): user_queue_space(c
, "DPDK::tcp::tcb::user_queue_space", 81920) {}
354 uint8_t window_scale
;
357 tcp_sequence initial
;
358 std::deque
<Packet
> data
;
359 tcp_packet_merger out_of_order
;
363 // positive means no errno, 0 means eof, nagetive means error
366 EventCallbackRef delayed_ack_event
;
367 Tub
<uint64_t> _delayed_ack_fd
;
368 // Retransmission timeout
369 std::chrono::microseconds _rto
{1000*1000};
370 std::chrono::microseconds _persist_time_out
{1000*1000};
371 static constexpr std::chrono::microseconds _rto_min
{1000*1000};
372 static constexpr std::chrono::microseconds _rto_max
{60000*1000};
374 static constexpr std::chrono::microseconds _rto_clk_granularity
{1000};
375 static constexpr uint16_t _max_nr_retransmit
{5};
376 EventCallbackRef retransmit_event
;
377 Tub
<uint64_t> retransmit_fd
;
378 EventCallbackRef persist_event
;
379 EventCallbackRef all_data_ack_event
;
380 Tub
<uint64_t> persist_fd
;
381 uint16_t _nr_full_seg_received
= 0;
383 // 512 bits secretkey for ISN generating
386 for (auto& k
: key
) {
387 k
= ceph::util::generate_random_number
<uint32_t>(0, std::numeric_limits
<uint32_t>::max());
391 static isn_secret _isn_secret
;
392 tcp_sequence
get_isn();
393 circular_buffer
<typename
InetTraits::l4packet
> _packetq
;
394 bool _poll_active
= false;
397 void close_final_cleanup();
398 ostream
& _prefix(std::ostream
*_dout
);
401 tcb(tcp
& t
, connid id
);
403 void input_handle_listen_state(tcp_hdr
* th
, Packet p
);
404 void input_handle_syn_sent_state(tcp_hdr
* th
, Packet p
);
405 void input_handle_other_state(tcp_hdr
* th
, Packet p
);
406 void output_one(bool data_retransmit
= false);
407 bool is_all_data_acked();
412 void remove_from_tcbs() {
413 auto id
= connid
{_local_ip
, _foreign_ip
, _local_port
, _foreign_port
};
414 _tcp
._tcbs
.erase(id
);
416 Tub
<typename
InetTraits::l4packet
> get_packet();
421 auto tcb
= this->shared_from_this();
422 _tcp
._inet
.wait_l2_dst_address(_foreign_ip
, Packet(), [tcb
] (const ethernet_address
&dst
, Packet p
, int r
) {
424 tcb
->_tcp
.poll_tcb(dst
, std::move(tcb
));
425 } else if (r
== -ETIMEDOUT
) {
426 // in other states connection should time out
427 if (tcb
->in_state(SYN_SENT
)) {
428 tcb
->_errno
= -ETIMEDOUT
;
431 } else if (r
== -EBUSY
) {
433 tcb
->_poll_active
= false;
434 tcb
->start_retransmit_timer();
440 int16_t get_errno() const {
448 uint64_t peek_sent_available() {
449 if (!in_state(ESTABLISHED
))
451 uint64_t left
= _snd
.user_queue_space
.get_max() - _snd
.user_queue_space
.get_current();
455 int is_connected() const {
458 return _connect_done
;
462 void respond_with_reset(tcp_hdr
* th
);
463 bool merge_out_of_order();
464 void insert_out_of_order(tcp_sequence seq
, Packet p
);
465 void trim_receive_data_after_window();
466 bool should_send_ack(uint16_t seg_len
);
467 void clear_delayed_ack();
468 Packet
get_transmit_packet();
469 void retransmit_one() {
470 bool data_retransmit
= true;
471 output_one(data_retransmit
);
473 void start_retransmit_timer() {
475 center
->delete_time_event(*retransmit_fd
);
476 retransmit_fd
.construct(center
->create_time_event(_rto
.count(), retransmit_event
));
478 void stop_retransmit_timer() {
480 center
->delete_time_event(*retransmit_fd
);
481 retransmit_fd
.destroy();
484 void start_persist_timer() {
486 center
->delete_time_event(*persist_fd
);
487 persist_fd
.construct(center
->create_time_event(_persist_time_out
.count(), persist_event
));
489 void stop_persist_timer() {
491 center
->delete_time_event(*persist_fd
);
492 persist_fd
.destroy();
497 void fast_retransmit();
498 void update_rto(clock_type::time_point tx_time
);
499 void update_cwnd(uint32_t acked_bytes
);
501 uint32_t can_send() {
502 if (_snd
.window_probe
) {
505 // Can not send more than advertised window allows
506 auto x
= std::min(uint32_t(_snd
.unacknowledged
+ _snd
.window
- _snd
.next
), _snd
.unsent_len
);
507 // Can not send more than congestion window allows
508 x
= std::min(_snd
.cwnd
, x
);
509 if (_snd
.dupacks
== 1 || _snd
.dupacks
== 2) {
511 // Send cwnd + 2 * smss per RFC3042
512 auto flight
= flight_size();
513 auto max
= _snd
.cwnd
+ 2 * _snd
.mss
;
514 x
= flight
<= max
? std::min(x
, max
- flight
) : 0;
515 _snd
.limited_transfer
+= x
;
516 } else if (_snd
.dupacks
>= 3) {
518 // Sent 1 full-sized segment at most
519 x
= std::min(uint32_t(_snd
.mss
), x
);
523 uint32_t flight_size() {
525 std::for_each(_snd
.data
.begin(), _snd
.data
.end(),
526 [&] (unacked_segment
& seg
) { size
+= seg
.p
.len(); });
529 uint16_t local_mss() {
530 return _tcp
.get_hw_features().mtu
- tcp_hdr_len_min
- InetTraits::ip_hdr_len_min
;
532 void queue_packet(Packet p
) {
533 _packetq
.emplace_back(
534 typename
InetTraits::l4packet
{_foreign_ip
, std::move(p
)});
536 void signal_data_received() {
537 manager
.notify(fd
, EVENT_READABLE
);
539 void signal_all_data_acked() {
540 if (_snd
._all_data_acked_fd
>= 0 && _snd
.unsent_len
== 0 && _snd
.queued_len
== 0)
541 manager
.notify(_snd
._all_data_acked_fd
, EVENT_READABLE
);
545 _snd
.syn_tx_time
= clock_type::now();
546 // Send <SYN> to remote
549 void do_syn_received() {
550 _state
= SYN_RECEIVED
;
551 _snd
.syn_tx_time
= clock_type::now();
552 // Send <SYN,ACK> to remote
555 void do_established() {
556 _state
= ESTABLISHED
;
557 update_rto(_snd
.syn_tx_time
);
558 _connect_done
= true;
559 manager
.notify(fd
, EVENT_READABLE
|EVENT_WRITABLE
);
563 // Free packets to be sent which are waiting for user_queue_space
564 _snd
.user_queue_space
.reset();
566 _errno
= -ECONNRESET
;
567 manager
.notify(fd
, EVENT_READABLE
);
569 if (_snd
._all_data_acked_fd
>= 0)
570 manager
.notify(_snd
._all_data_acked_fd
, EVENT_READABLE
);
572 void do_time_wait() {
573 // FIXME: Implement TIME_WAIT state timer
581 void do_setup_isn() {
582 _snd
.initial
= get_isn();
583 _snd
.unacknowledged
= _snd
.initial
;
584 _snd
.next
= _snd
.initial
+ 1;
585 _snd
.recover
= _snd
.initial
;
587 void do_local_fin_acked() {
588 _snd
.unacknowledged
+= 1;
591 bool syn_needs_on() {
592 return in_state(SYN_SENT
| SYN_RECEIVED
);
594 bool fin_needs_on() {
595 return in_state(FIN_WAIT_1
| CLOSING
| LAST_ACK
) && _snd
.closed
&&
596 _snd
.unsent_len
== 0 && _snd
.queued_len
== 0;
598 bool ack_needs_on() {
599 return !in_state(CLOSED
| LISTEN
| SYN_SENT
);
601 bool foreign_will_not_send() {
602 return in_state(CLOSING
| TIME_WAIT
| CLOSE_WAIT
| LAST_ACK
| CLOSED
);
604 bool in_state(tcp_state state
) {
605 return uint16_t(_state
) & uint16_t(state
);
607 void exit_fast_recovery() {
609 _snd
.limited_transfer
= 0;
610 _snd
.partial_ack
= 0;
612 uint32_t data_segment_acked(tcp_sequence seg_ack
);
613 bool segment_acceptable(tcp_sequence seg_seq
, unsigned seg_len
);
614 void init_from_options(tcp_hdr
* th
, uint8_t* opt_start
, uint8_t* opt_end
);
615 friend class connection
;
617 friend class C_handle_delayed_ack
;
618 friend class C_handle_retransmit
;
619 friend class C_handle_persist
;
620 friend class C_all_data_acked
;
624 // ipv4_l4<ip_protocol_num::tcp>
627 UserspaceEventManager
&manager
;
628 std::unordered_map
<connid
, lw_shared_ptr
<tcb
>, connid_hash
> _tcbs
;
629 std::unordered_map
<uint16_t, listener
*> _listening
;
630 std::random_device _rd
;
631 std::default_random_engine _e
;
632 std::uniform_int_distribution
<uint16_t> _port_dist
{41952, 65535};
633 circular_buffer
<std::pair
<lw_shared_ptr
<tcb
>, ethernet_address
>> _poll_tcbs
;
634 // queue for packets that do not belong to any tcb
635 circular_buffer
<ipv4_traits::l4packet
> _packetq
;
636 Throttle _queue_space
;
637 // Limit number of data queued into send queue
640 lw_shared_ptr
<tcb
> _tcb
;
642 explicit connection(lw_shared_ptr
<tcb
> tcbp
) : _tcb(std::move(tcbp
)) { _tcb
->_conn
= this; }
643 connection(const connection
&) = delete;
644 connection(connection
&& x
) noexcept
: _tcb(std::move(x
._tcb
)) {
648 void operator=(const connection
&) = delete;
649 connection
& operator=(connection
&& x
) {
652 new (this) connection(std::move(x
));
660 return _tcb
->send(std::move(p
));
665 int16_t get_errno() const {
666 return _tcb
->get_errno();
670 entity_addr_t
remote_addr() const {
672 auto net_ip
= _tcb
->_foreign_ip
.hton();
673 memcpy((void*)&addr
.in4_addr().sin_addr
.s_addr
,
674 &net_ip
, sizeof(addr
.in4_addr().sin_addr
.s_addr
));
675 addr
.set_family(AF_INET
);
678 uint64_t peek_sent_available() {
679 return _tcb
->peek_sent_available();
681 int is_connected() const { return _tcb
->is_connected(); }
688 queue
<connection
> _q
;
689 size_t _q_max_length
;
692 listener(tcp
& t
, uint16_t port
, size_t queue_length
)
693 : _tcp(t
), _port(port
), _errno(0), _q(), _q_max_length(queue_length
) {
696 listener(const listener
&) = delete;
697 void operator=(const listener
&) = delete;
698 listener(listener
&& x
)
699 : _tcp(x
._tcp
), _port(x
._port
), _fd(std::move(x
._fd
)), _errno(x
._errno
),
700 _q(std::move(x
._q
)) {
702 _tcp
._listening
[_port
] = this;
708 if (_tcp
._listening
.find(_port
) != _tcp
._listening
.end())
710 _tcp
._listening
.emplace(_port
, this);
711 _fd
= _tcp
.manager
.get_eventfd();
714 Tub
<connection
> accept() {
717 c
= std::move(_q
.front());
722 void abort_accept() {
726 _tcp
._listening
.erase(_port
);
727 _tcp
.manager
.close(_fd
);
731 int16_t get_errno() const {
735 return _q
.size() == _q_max_length
;
743 explicit tcp(CephContext
*c
, inet_type
& inet
, EventCenter
*cen
);
744 void received(Packet p
, ipaddr from
, ipaddr to
);
745 bool forward(forward_hash
& out_hash_data
, Packet
& p
, size_t off
);
746 listener
listen(uint16_t port
, size_t queue_length
= 100);
747 connection
connect(const entity_addr_t
&addr
);
748 const hw_features
& get_hw_features() const { return _inet
._inet
.get_hw_features(); }
749 void poll_tcb(const ethernet_address
&dst
, lw_shared_ptr
<tcb
> tcb
) {
750 _poll_tcbs
.emplace_back(std::move(tcb
), dst
);
752 bool push_listen_queue(uint16_t port
, tcb
*t
) {
753 auto listener
= _listening
.find(port
);
754 if (listener
== _listening
.end() || listener
->second
->full()) {
757 listener
->second
->_q
.push(connection(t
->shared_from_this()));
758 manager
.notify(listener
->second
->_fd
, EVENT_READABLE
);
763 void send_packet_without_tcb(ipaddr from
, ipaddr to
, Packet p
);
764 void respond_with_reset(tcp_hdr
* rth
, ipaddr local_ip
, ipaddr foreign_ip
);
765 friend class listener
;
768 template <typename InetTraits
>
769 tcp
<InetTraits
>::tcp(CephContext
*c
, inet_type
& inet
, EventCenter
*cen
)
770 : cct(c
), _inet(inet
), center(cen
),
771 manager(static_cast<DPDKDriver
*>(cen
->get_driver())->manager
),
772 _e(_rd()), _queue_space(cct
, "DPDK::tcp::queue_space", 81920) {
774 _inet
.register_packet_provider([this, tcb_polled
] () mutable {
775 Tub
<typename
InetTraits::l4packet
> l4p
;
776 auto c
= _poll_tcbs
.size();
777 if (!_packetq
.empty() && (!(tcb_polled
% 128) || c
== 0)) {
778 l4p
= std::move(_packetq
.front());
779 _packetq
.pop_front();
780 _queue_space
.put(l4p
->p
.len());
784 lw_shared_ptr
<tcb
> tcb
;
785 ethernet_address dst
;
786 std::tie(tcb
, dst
) = std::move(_poll_tcbs
.front());
787 _poll_tcbs
.pop_front();
788 l4p
= std::move(tcb
->get_packet());
799 template <typename InetTraits
>
800 auto tcp
<InetTraits
>::listen(uint16_t port
, size_t queue_length
) -> listener
{
801 return listener(*this, port
, queue_length
);
804 template <typename InetTraits
>
805 typename tcp
<InetTraits
>::connection tcp
<InetTraits
>::connect(const entity_addr_t
&addr
) {
808 auto src_ip
= _inet
._inet
.host_address();
809 auto dst_ip
= ipv4_address(addr
);
810 auto dst_port
= addr
.get_port();
813 src_port
= _port_dist(_e
);
814 id
= connid
{src_ip
, dst_ip
, src_port
, (uint16_t)dst_port
};
815 if (_tcbs
.find(id
) == _tcbs
.end()) {
816 if (_inet
._inet
.netif()->hw_queues_count() == 1 ||
817 _inet
._inet
.netif()->hash2cpu(
818 id
.hash(_inet
._inet
.netif()->rss_key())) == center
->get_id())
823 auto tcbp
= make_lw_shared
<tcb
>(*this, id
);
824 _tcbs
.insert({id
, tcbp
});
826 return connection(tcbp
);
829 template <typename InetTraits
>
830 bool tcp
<InetTraits
>::forward(forward_hash
& out_hash_data
, Packet
& p
, size_t off
) {
831 auto th
= p
.get_header
<tcp_hdr
>(off
);
833 out_hash_data
.push_back(th
->src_port
);
834 out_hash_data
.push_back(th
->dst_port
);
839 template <typename InetTraits
>
840 void tcp
<InetTraits
>::received(Packet p
, ipaddr from
, ipaddr to
) {
841 auto th
= p
.get_header
<tcp_hdr
>(0);
845 // th->data_offset is correct even before ntoh()
846 if (unsigned(th
->data_offset
* 4) < sizeof(*th
)) {
850 if (!get_hw_features().rx_csum_offload
) {
852 InetTraits::tcp_pseudo_header_checksum(csum
, from
, to
, p
.len());
854 if (csum
.get() != 0) {
859 auto id
= connid
{to
, from
, h
.dst_port
, h
.src_port
};
860 auto tcbi
= _tcbs
.find(id
);
861 lw_shared_ptr
<tcb
> tcbp
;
862 if (tcbi
== _tcbs
.end()) {
863 auto listener
= _listening
.find(id
.local_port
);
864 if (listener
== _listening
.end() || listener
->second
->full()) {
866 // 1.1 all data in the incoming segment is discarded. An incoming
867 // segment containing a RST is discarded. An incoming segment not
868 // containing a RST causes a RST to be sent in response.
870 // if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK>
871 // if ACK on: <SEQ=SEG.ACK><CTL=RST>
872 return respond_with_reset(&h
, id
.local_ip
, id
.foreign_ip
);
874 // 2) In LISTEN state
875 // 2.1 first check for an RST
877 // An incoming RST should be ignored
880 // 2.2 second check for an ACK
882 // Any acknowledgment is bad if it arrives on a connection
883 // still in the LISTEN state.
884 // <SEQ=SEG.ACK><CTL=RST>
885 return respond_with_reset(&h
, id
.local_ip
, id
.foreign_ip
);
887 // 2.3 third check for a SYN
889 // check the security
890 // NOTE: Ignored for now
891 tcbp
= make_lw_shared
<tcb
>(*this, id
);
892 _tcbs
.insert({id
, tcbp
});
893 return tcbp
->input_handle_listen_state(&h
, std::move(p
));
895 // 2.4 fourth other text or control
896 // So you are unlikely to get here, but if you do, drop the
897 // segment, and return.
902 if (tcbp
->state() == tcp_state::SYN_SENT
) {
903 // 3) In SYN_SENT State
904 return tcbp
->input_handle_syn_sent_state(&h
, std::move(p
));
906 // 4) In other state, can be one of the following:
907 // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
908 // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
909 return tcbp
->input_handle_other_state(&h
, std::move(p
));
914 // Send packet does not belong to any tcb
915 template <typename InetTraits
>
916 void tcp
<InetTraits
>::send_packet_without_tcb(ipaddr from
, ipaddr to
, Packet p
) {
917 if (_queue_space
.get_or_fail(p
.len())) { // drop packets that do not fit the queue
918 _inet
.wait_l2_dst_address(to
, std::move(p
), [this, to
] (const ethernet_address
&e_dst
, Packet p
, int r
) mutable {
920 _packetq
.emplace_back(ipv4_traits::l4packet
{to
, std::move(p
), e_dst
, ip_protocol_num::tcp
});
925 template <typename InetTraits
>
926 tcp
<InetTraits
>::connection::~connection() {
928 _tcb
->_conn
= nullptr;
934 template <typename InetTraits
>
935 tcp
<InetTraits
>::tcb::tcb(tcp
& t
, connid id
)
936 : _tcp(t
), manager(t
.manager
), _local_ip(id
.local_ip
) , _foreign_ip(id
.foreign_ip
),
937 _local_port(id
.local_port
), _foreign_port(id
.foreign_port
),
940 fd(t
.manager
.get_eventfd()),
941 delayed_ack_event(new tcp
<InetTraits
>::C_handle_delayed_ack(this)),
942 retransmit_event(new tcp
<InetTraits
>::C_handle_retransmit(this)),
943 persist_event(new tcp
<InetTraits
>::C_handle_persist(this)),
944 all_data_ack_event(new tcp
<InetTraits
>::C_all_data_acked(this)) {}
946 template <typename InetTraits
>
947 tcp
<InetTraits
>::tcb::~tcb()
950 center
->delete_time_event(*_delayed_ack_fd
);
952 center
->delete_time_event(*retransmit_fd
);
954 center
->delete_time_event(*persist_fd
);
955 delete delayed_ack_event
;
956 delete retransmit_event
;
957 delete persist_event
;
958 delete all_data_ack_event
;
963 template <typename InetTraits
>
964 void tcp
<InetTraits
>::tcb::respond_with_reset(tcp_hdr
* rth
)
966 _tcp
.respond_with_reset(rth
, _local_ip
, _foreign_ip
);
969 template <typename InetTraits
>
970 uint32_t tcp
<InetTraits
>::tcb::data_segment_acked(tcp_sequence seg_ack
) {
971 uint32_t total_acked_bytes
= 0;
972 // Full ACK of segment
973 while (!_snd
.data
.empty()
974 && (_snd
.unacknowledged
+ _snd
.data
.front().p
.len() <= seg_ack
)) {
975 auto acked_bytes
= _snd
.data
.front().p
.len();
976 _snd
.unacknowledged
+= acked_bytes
;
977 // Ignore retransmitted segments when setting the RTO
978 if (_snd
.data
.front().nr_transmits
== 0) {
979 update_rto(_snd
.data
.front().tx_time
);
981 update_cwnd(acked_bytes
);
982 total_acked_bytes
+= acked_bytes
;
983 _snd
.user_queue_space
.put(_snd
.data
.front().data_len
);
984 manager
.notify(fd
, EVENT_WRITABLE
);
985 _snd
.data
.pop_front();
987 // Partial ACK of segment
988 if (_snd
.unacknowledged
< seg_ack
) {
989 auto acked_bytes
= seg_ack
- _snd
.unacknowledged
;
990 if (!_snd
.data
.empty()) {
991 auto& unacked_seg
= _snd
.data
.front();
992 unacked_seg
.p
.trim_front(acked_bytes
);
994 _snd
.unacknowledged
= seg_ack
;
995 update_cwnd(acked_bytes
);
996 total_acked_bytes
+= acked_bytes
;
998 return total_acked_bytes
;
1001 template <typename InetTraits
>
1002 bool tcp
<InetTraits
>::tcb::segment_acceptable(tcp_sequence seg_seq
, unsigned seg_len
) {
1003 if (seg_len
== 0 && _rcv
.window
== 0) {
1004 // SEG.SEQ = RCV.NXT
1005 return seg_seq
== _rcv
.next
;
1006 } else if (seg_len
== 0 && _rcv
.window
> 0) {
1007 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1008 return (_rcv
.next
<= seg_seq
) && (seg_seq
< _rcv
.next
+ _rcv
.window
);
1009 } else if (seg_len
> 0 && _rcv
.window
> 0) {
1010 // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1012 // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
1013 bool x
= (_rcv
.next
<= seg_seq
) && seg_seq
< (_rcv
.next
+ _rcv
.window
);
1014 bool y
= (_rcv
.next
<= seg_seq
+ seg_len
- 1) && (seg_seq
+ seg_len
- 1 < _rcv
.next
+ _rcv
.window
);
1017 // SEG.LEN > 0 RCV.WND = 0, not acceptable
1022 template <typename InetTraits
>
1023 void tcp
<InetTraits
>::tcb::init_from_options(tcp_hdr
* th
, uint8_t* opt_start
, uint8_t* opt_end
) {
1024 // Handle tcp options
1025 _option
.parse(opt_start
, opt_end
);
1027 // Remote receive window scale factor
1028 _snd
.window_scale
= _option
._remote_win_scale
;
1029 // Local receive window scale factor
1030 _rcv
.window_scale
= _option
._local_win_scale
;
1032 // Maximum segment size remote can receive
1033 _snd
.mss
= _option
._remote_mss
;
1034 // Maximum segment size local can receive
1035 _rcv
.mss
= _option
._local_mss
= local_mss();
1037 // Linux's default window size
1038 _rcv
.window
= 29200 << _rcv
.window_scale
;
1039 _snd
.window
= th
->window
<< _snd
.window_scale
;
1041 // Segment sequence number used for last window update
1043 // Segment acknowledgment number used for last window update
1046 // Setup initial congestion window
1047 if (2190 < _snd
.mss
) {
1048 _snd
.cwnd
= 2 * _snd
.mss
;
1049 } else if (1095 < _snd
.mss
&& _snd
.mss
<= 2190) {
1050 _snd
.cwnd
= 3 * _snd
.mss
;
1052 _snd
.cwnd
= 4 * _snd
.mss
;
1055 // Setup initial slow start threshold
1056 _snd
.ssthresh
= th
->window
<< _snd
.window_scale
;
1059 template <typename InetTraits
>
1060 Packet tcp
<InetTraits
>::tcb::get_transmit_packet() {
1061 // easy case: empty queue
1062 if (_snd
.unsent
.empty()) {
1065 auto can_send
= this->can_send();
1066 // Max number of TCP payloads we can pass to NIC
1068 if (_tcp
.get_hw_features().tx_tso
) {
1069 // FIXME: Info tap device the size of the split packet
1070 len
= _tcp
.get_hw_features().max_packet_len
- tcp_hdr_len_min
- InetTraits::ip_hdr_len_min
;
1072 len
= std::min(uint16_t(_tcp
.get_hw_features().mtu
- tcp_hdr_len_min
- InetTraits::ip_hdr_len_min
), _snd
.mss
);
1074 can_send
= std::min(can_send
, len
);
1075 // easy case: one small packet
1076 if (_snd
.unsent
.front().len() <= can_send
) {
1077 auto p
= std::move(_snd
.unsent
.front());
1078 _snd
.unsent
.pop_front();
1079 _snd
.unsent_len
-= p
.len();
1082 // moderate case: need to split one packet
1083 if (_snd
.unsent
.front().len() > can_send
) {
1084 auto p
= _snd
.unsent
.front().share(0, can_send
);
1085 _snd
.unsent
.front().trim_front(can_send
);
1086 _snd
.unsent_len
-= p
.len();
1089 // hard case: merge some packets, possibly split last
1090 auto p
= std::move(_snd
.unsent
.front());
1091 _snd
.unsent
.pop_front();
1092 can_send
-= p
.len();
1093 while (!_snd
.unsent
.empty()
1094 && _snd
.unsent
.front().len() <= can_send
) {
1095 can_send
-= _snd
.unsent
.front().len();
1096 p
.append(std::move(_snd
.unsent
.front()));
1097 _snd
.unsent
.pop_front();
1099 // FIXME: this will result in calling "deleter" of packet which free managed objects
1101 // if (!_snd.unsent.empty() && can_send) {
1102 // auto& q = _snd.unsent.front();
1103 // p.append(q.share(0, can_send));
1104 // q.trim_front(can_send);
1106 _snd
.unsent_len
-= p
.len();
1110 template <typename InetTraits
>
1111 void tcp
<InetTraits
>::tcb::output_one(bool data_retransmit
) {
1112 if (in_state(CLOSED
)) {
1116 Packet p
= data_retransmit
? _snd
.data
.front().p
.share() : get_transmit_packet();
1117 Packet clone
= p
.share(); // early clone to prevent share() from calling packet::unuse_internal_data() on header.
1118 uint16_t len
= p
.len();
1119 bool syn_on
= syn_needs_on();
1120 bool ack_on
= ack_needs_on();
1122 auto options_size
= _option
.get_size(syn_on
, ack_on
);
1123 auto th
= p
.prepend_header
<tcp_hdr
>(options_size
);
1125 th
->src_port
= _local_port
;
1126 th
->dst_port
= _foreign_port
;
1131 clear_delayed_ack();
1137 if (data_retransmit
) {
1138 seq
= _snd
.unacknowledged
;
1140 seq
= syn_on
? _snd
.initial
: _snd
.next
;
1144 th
->ack
= _rcv
.next
;
1145 th
->data_offset
= (sizeof(*th
) + options_size
) / 4;
1146 th
->window
= _rcv
.window
>> _rcv
.window_scale
;
1149 // FIXME: does the FIN have to fit in the window?
1150 bool fin_on
= fin_needs_on();
1154 _option
.fill(th
, options_size
);
1159 uint16_t pseudo_hdr_seg_len
= 0;
1161 oi
.tcp_hdr_len
= sizeof(tcp_hdr
) + options_size
;
1163 if (_tcp
.get_hw_features().tx_csum_l4_offload
) {
1164 oi
.needs_csum
= true;
1167 // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's
1168 // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones'
1169 // complement sum of the pseudo header.
1171 // For TSO the csum should be calculated for a pseudo header with
1172 // segment length set to 0. All the rest is the same as for a TCP Tx
1173 // CSUM offload case.
1175 if (_tcp
.get_hw_features().tx_tso
&& len
> _snd
.mss
) {
1176 oi
.tso_seg_size
= _snd
.mss
;
1178 pseudo_hdr_seg_len
= sizeof(*th
) + options_size
+ len
;
1181 pseudo_hdr_seg_len
= sizeof(*th
) + options_size
+ len
;
1182 oi
.needs_csum
= false;
1185 InetTraits::tcp_pseudo_header_checksum(csum
, _local_ip
, _foreign_ip
,
1186 pseudo_hdr_seg_len
);
1188 if (_tcp
.get_hw_features().tx_csum_l4_offload
) {
1189 th
->checksum
= ~csum
.get();
1192 th
->checksum
= csum
.get();
1195 oi
.protocol
= ip_protocol_num::tcp
;
1197 p
.set_offload_info(oi
);
1199 if (!data_retransmit
&& (len
|| syn_on
|| fin_on
)) {
1200 auto now
= clock_type::now();
1202 unsigned nr_transmits
= 0;
1203 _snd
.data
.emplace_back(unacked_segment
{std::move(clone
),
1204 len
, nr_transmits
, now
});
1206 if (!retransmit_fd
) {
1207 start_retransmit_timer();
1211 queue_packet(std::move(p
));
1214 template <typename InetTraits
>
1215 bool tcp
<InetTraits
>::tcb::is_all_data_acked() {
1216 if (_snd
.data
.empty() && _snd
.unsent_len
== 0 && _snd
.queued_len
== 0) {
1222 template <typename InetTraits
>
1223 Tub
<Packet
> tcp
<InetTraits
>::tcb::read() {
1225 if (_rcv
.data
.empty())
1229 for (auto&& q
: _rcv
.data
) {
1230 p
->append(std::move(q
));
1236 template <typename InetTraits
>
1237 int tcp
<InetTraits
>::tcb::send(Packet p
) {
1238 // We can not send after the connection is closed
1239 ceph_assert(!_snd
.closed
);
1241 if (in_state(CLOSED
))
1245 if (!_snd
.user_queue_space
.get_or_fail(len
)) {
1246 // note: caller must ensure enough queue space to send
1249 // TODO: Handle p.len() > max user_queue_space case
1250 _snd
.queued_len
+= len
;
1251 _snd
.unsent_len
+= len
;
1252 _snd
.queued_len
-= len
;
1253 _snd
.unsent
.push_back(std::move(p
));
1254 if (can_send() > 0) {
1260 template <typename InetTraits
>
1261 void tcp
<InetTraits
>::tcb::close() {
1262 if (in_state(CLOSED
) || _snd
.closed
) {
1265 // TODO: We should make this asynchronous
1268 center
->delete_file_event(fd
, EVENT_READABLE
|EVENT_WRITABLE
);
1269 bool acked
= is_all_data_acked();
1271 _snd
._all_data_acked_fd
= manager
.get_eventfd();
1272 center
->create_file_event(_snd
._all_data_acked_fd
, EVENT_READABLE
, all_data_ack_event
);
1274 close_final_cleanup();
1278 template <typename InetTraits
>
1279 bool tcp
<InetTraits
>::tcb::should_send_ack(uint16_t seg_len
) {
1280 // We've received a TSO packet, do ack immediately
1281 if (seg_len
> _rcv
.mss
) {
1282 _nr_full_seg_received
= 0;
1283 if (_delayed_ack_fd
) {
1284 center
->delete_time_event(*_delayed_ack_fd
);
1285 _delayed_ack_fd
.destroy();
1290 // We've received a full sized segment, ack for every second full sized segment
1291 if (seg_len
== _rcv
.mss
) {
1292 if (_nr_full_seg_received
++ >= 1) {
1293 _nr_full_seg_received
= 0;
1294 if (_delayed_ack_fd
) {
1295 center
->delete_time_event(*_delayed_ack_fd
);
1296 _delayed_ack_fd
.destroy();
1302 // If the timer is armed and its callback hasn't been run.
1303 if (_delayed_ack_fd
) {
1307 // If the timer is not armed, schedule a delayed ACK.
1308 // The maximum delayed ack timer allowed by RFC1122 is 500ms, most
1309 // implementations use 200ms.
1310 _delayed_ack_fd
.construct(center
->create_time_event(200*1000, delayed_ack_event
));
1314 template <typename InetTraits
>
1315 void tcp
<InetTraits
>::tcb::clear_delayed_ack() {
1316 if (_delayed_ack_fd
) {
1317 center
->delete_time_event(*_delayed_ack_fd
);
1318 _delayed_ack_fd
.destroy();
1322 template <typename InetTraits
>
1323 bool tcp
<InetTraits
>::tcb::merge_out_of_order() {
1324 bool merged
= false;
1325 if (_rcv
.out_of_order
.map
.empty()) {
1328 for (auto it
= _rcv
.out_of_order
.map
.begin(); it
!= _rcv
.out_of_order
.map
.end();) {
1329 auto& p
= it
->second
;
1330 auto seg_beg
= it
->first
;
1331 auto seg_len
= p
.len();
1332 auto seg_end
= seg_beg
+ seg_len
;
1333 if (seg_beg
<= _rcv
.next
&& seg_end
> _rcv
.next
) {
1334 // This segment has been received out of order and its previous
1335 // segment has been received now
1336 auto trim
= _rcv
.next
- seg_beg
;
1341 _rcv
.next
+= seg_len
;
1342 _rcv
.data
.push_back(std::move(p
));
1343 // Since c++11, erase() always returns the value of the following element
1344 it
= _rcv
.out_of_order
.map
.erase(it
);
1346 } else if (_rcv
.next
>= seg_end
) {
1347 // This segment has been receive already, drop it
1348 it
= _rcv
.out_of_order
.map
.erase(it
);
1350 // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only,
1351 // so we can stop looking here.
1359 template <typename InetTraits
>
1360 void tcp
<InetTraits
>::tcb::insert_out_of_order(tcp_sequence seg
, Packet p
) {
1361 _rcv
.out_of_order
.merge(seg
, std::move(p
));
1364 template <typename InetTraits
>
1365 void tcp
<InetTraits
>::tcb::trim_receive_data_after_window() {
1369 template <typename InetTraits
>
1370 void tcp
<InetTraits
>::tcb::fast_retransmit() {
1371 if (!_snd
.data
.empty()) {
1372 auto& unacked_seg
= _snd
.data
.front();
1373 unacked_seg
.nr_transmits
++;
1379 template <typename InetTraits
>
1380 void tcp
<InetTraits
>::tcb::update_rto(clock_type::time_point tx_time
) {
1381 // Update RTO according to RFC6298
1382 auto R
= std::chrono::duration_cast
<std::chrono::microseconds
>(clock_type::now() - tx_time
);
1383 if (_snd
.first_rto_sample
) {
1384 _snd
.first_rto_sample
= false;
1387 _snd
.rttvar
= R
/ 2;
1390 // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
1391 // SRTT <- (1 - alpha) * SRTT + alpha * R'
1392 // where alpha = 1/8 and beta = 1/4
1393 auto delta
= _snd
.srtt
> R
? (_snd
.srtt
- R
) : (R
- _snd
.srtt
);
1394 _snd
.rttvar
= _snd
.rttvar
* 3 / 4 + delta
/ 4;
1395 _snd
.srtt
= _snd
.srtt
* 7 / 8 + R
/ 8;
1397 // RTO <- SRTT + max(G, K * RTTVAR)
1398 _rto
= _snd
.srtt
+ std::max(_rto_clk_granularity
, 4 * _snd
.rttvar
);
1400 // Make sure 1 sec << _rto << 60 sec
1401 _rto
= std::max(_rto
, _rto_min
);
1402 _rto
= std::min(_rto
, _rto_max
);
1405 template <typename InetTraits
>
1406 void tcp
<InetTraits
>::tcb::update_cwnd(uint32_t acked_bytes
) {
1407 uint32_t smss
= _snd
.mss
;
1408 if (_snd
.cwnd
< _snd
.ssthresh
) {
1409 // In slow start phase
1410 _snd
.cwnd
+= std::min(acked_bytes
, smss
);
1412 // In congestion avoidance phase
1413 uint32_t round_up
= 1;
1414 _snd
.cwnd
+= std::max(round_up
, smss
* smss
/ _snd
.cwnd
);
1419 template <typename InetTraits
>
1420 void tcp
<InetTraits
>::tcb::cleanup() {
1421 manager
.notify(fd
, EVENT_READABLE
);
1423 _snd
.unsent
.clear();
1425 _rcv
.out_of_order
.map
.clear();
1427 stop_retransmit_timer();
1428 clear_delayed_ack();
1429 center
->dispatch_event_external(new tcp
<InetTraits
>::C_actual_remove_tcb(this));
1433 template <typename InetTraits
>
1434 tcp_sequence tcp
<InetTraits
>::tcb::get_isn() {
1435 // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers
1436 // with the expression:
1437 // ISN = M + F(localip, localport, remoteip, remoteport, secretkey)
1438 // M is the 4 microsecond timer
1439 using namespace std::chrono
;
1441 hash
[0] = _local_ip
.ip
;
1442 hash
[1] = _foreign_ip
.ip
;
1443 hash
[2] = (_local_port
<< 16) + _foreign_port
;
1444 hash
[3] = _isn_secret
.key
[15];
1445 ceph::crypto::MD5 md5
;
1446 md5
.Update((const unsigned char*)_isn_secret
.key
, sizeof(_isn_secret
.key
));
1447 md5
.Final((unsigned char*)hash
);
1449 auto m
= duration_cast
<microseconds
>(clock_type::now().time_since_epoch());
1450 seq
+= m
.count() / 4;
1451 return make_seq(seq
);
1454 template <typename InetTraits
>
1455 Tub
<typename
InetTraits::l4packet
> tcp
<InetTraits
>::tcb::get_packet() {
1456 _poll_active
= false;
1457 if (_packetq
.empty()) {
1461 Tub
<typename
InetTraits::l4packet
> p
;
1462 if (in_state(CLOSED
)) {
1466 ceph_assert(!_packetq
.empty());
1468 p
= std::move(_packetq
.front());
1469 _packetq
.pop_front();
1470 if (!_packetq
.empty() || (_snd
.dupacks
< 3 && can_send() > 0)) {
1471 // If there are packets to send in the queue or tcb is allowed to send
1472 // more add tcp back to polling set to keep sending. In addition, dupacks >= 3
1473 // is an indication that an segment is lost, stop sending more in this case.
1479 template <typename InetTraits
>
1480 void tcp
<InetTraits
>::connection::close_read() {
1482 // _tcb->manager.notify(_tcb->fd, EVENT_READABLE);
1485 template <typename InetTraits
>
1486 void tcp
<InetTraits
>::connection::close_write() {
1490 template <typename InetTraits
>
1491 constexpr uint16_t tcp
<InetTraits
>::tcb::_max_nr_retransmit
;
1493 template <typename InetTraits
>
1494 constexpr std::chrono::microseconds tcp
<InetTraits
>::tcb::_rto_min
;
1496 template <typename InetTraits
>
1497 constexpr std::chrono::microseconds tcp
<InetTraits
>::tcb::_rto_max
;
1499 template <typename InetTraits
>
1500 constexpr std::chrono::microseconds tcp
<InetTraits
>::tcb::_rto_clk_granularity
;
1502 template <typename InetTraits
>
1503 typename tcp
<InetTraits
>::tcb::isn_secret tcp
<InetTraits
>::tcb::_isn_secret
;
1506 #endif /* TCP_HH_ */