]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | /* | |
3 | * This file is open source software, licensed to you under the terms | |
4 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
5 | * distributed with this work for additional information regarding copyright | |
6 | * ownership. You may not use this file except in compliance with the License. | |
7 | * | |
8 | * You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | /* | |
20 | * Copyright (C) 2014 Cloudius Systems, Ltd. | |
21 | */ | |
22 | ||
23 | #ifndef CEPH_DPDK_TCP_H_ | |
24 | #define CEPH_DPDK_TCP_H_ | |
25 | ||
26 | #include <unordered_map> | |
27 | #include <map> | |
28 | #include <queue> | |
29 | #include <functional> | |
30 | #include <deque> | |
31 | #include <chrono> | |
32 | #include <random> | |
33 | #include <stdexcept> | |
34 | #include <system_error> | |
35 | ||
36 | #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1 | |
37 | #include <cryptopp/md5.h> | |
38 | ||
39 | #include "msg/async/dpdk/EventDPDK.h" | |
40 | ||
41 | #include "include/utime.h" | |
42 | #include "common/Throttle.h" | |
43 | #include "common/ceph_time.h" | |
44 | #include "msg/async/Event.h" | |
45 | #include "IPChecksum.h" | |
46 | #include "IP.h" | |
47 | #include "const.h" | |
48 | #include "byteorder.h" | |
49 | #include "shared_ptr.h" | |
50 | #include "PacketUtil.h" | |
51 | ||
52 | struct tcp_hdr; | |
53 | ||
54 | enum class tcp_state : uint16_t { | |
55 | CLOSED = (1 << 0), | |
56 | LISTEN = (1 << 1), | |
57 | SYN_SENT = (1 << 2), | |
58 | SYN_RECEIVED = (1 << 3), | |
59 | ESTABLISHED = (1 << 4), | |
60 | FIN_WAIT_1 = (1 << 5), | |
61 | FIN_WAIT_2 = (1 << 6), | |
62 | CLOSE_WAIT = (1 << 7), | |
63 | CLOSING = (1 << 8), | |
64 | LAST_ACK = (1 << 9), | |
65 | TIME_WAIT = (1 << 10) | |
66 | }; | |
67 | ||
68 | inline tcp_state operator|(tcp_state s1, tcp_state s2) { | |
69 | return tcp_state(uint16_t(s1) | uint16_t(s2)); | |
70 | } | |
71 | ||
31f18b77 | 72 | inline std::ostream & operator<<(std::ostream & str, const tcp_state& s) { |
7c673cae FG |
73 | switch (s) { |
74 | case tcp_state::CLOSED: return str << "CLOSED"; | |
75 | case tcp_state::LISTEN: return str << "LISTEN"; | |
76 | case tcp_state::SYN_SENT: return str << "SYN_SENT"; | |
77 | case tcp_state::SYN_RECEIVED: return str << "SYN_RECEIVED"; | |
78 | case tcp_state::ESTABLISHED: return str << "ESTABLISHED"; | |
79 | case tcp_state::FIN_WAIT_1: return str << "FIN_WAIT_1"; | |
80 | case tcp_state::FIN_WAIT_2: return str << "FIN_WAIT_2"; | |
81 | case tcp_state::CLOSE_WAIT: return str << "CLOSE_WAIT"; | |
82 | case tcp_state::CLOSING: return str << "CLOSING"; | |
83 | case tcp_state::LAST_ACK: return str << "LAST_ACK"; | |
84 | case tcp_state::TIME_WAIT: return str << "TIME_WAIT"; | |
85 | default: return str << "UNKNOWN"; | |
86 | } | |
87 | } | |
88 | ||
89 | struct tcp_option { | |
90 | // The kind and len field are fixed and defined in TCP protocol | |
91 | enum class option_kind: uint8_t { mss = 2, win_scale = 3, sack = 4, timestamps = 8, nop = 1, eol = 0 }; | |
92 | enum class option_len: uint8_t { mss = 4, win_scale = 3, sack = 2, timestamps = 10, nop = 1, eol = 1 }; | |
93 | struct mss { | |
94 | option_kind kind = option_kind::mss; | |
95 | option_len len = option_len::mss; | |
96 | uint16_t mss; | |
97 | struct mss hton() { | |
98 | struct mss m = *this; | |
99 | m.mss = ::hton(m.mss); | |
100 | return m; | |
101 | } | |
102 | } __attribute__((packed)); | |
103 | struct win_scale { | |
104 | option_kind kind = option_kind::win_scale; | |
105 | option_len len = option_len::win_scale; | |
106 | uint8_t shift; | |
107 | } __attribute__((packed)); | |
108 | struct sack { | |
109 | option_kind kind = option_kind::sack; | |
110 | option_len len = option_len::sack; | |
111 | } __attribute__((packed)); | |
112 | struct timestamps { | |
113 | option_kind kind = option_kind::timestamps; | |
114 | option_len len = option_len::timestamps; | |
115 | uint32_t t1; | |
116 | uint32_t t2; | |
117 | } __attribute__((packed)); | |
118 | struct nop { | |
119 | option_kind kind = option_kind::nop; | |
120 | } __attribute__((packed)); | |
121 | struct eol { | |
122 | option_kind kind = option_kind::eol; | |
123 | } __attribute__((packed)); | |
124 | static const uint8_t align = 4; | |
125 | ||
126 | void parse(uint8_t* beg, uint8_t* end); | |
127 | uint8_t fill(tcp_hdr* th, uint8_t option_size); | |
128 | uint8_t get_size(bool syn_on, bool ack_on); | |
129 | ||
130 | // For option negotiattion | |
131 | bool _mss_received = false; | |
132 | bool _win_scale_received = false; | |
133 | bool _timestamps_received = false; | |
134 | bool _sack_received = false; | |
135 | ||
136 | // Option data | |
137 | uint16_t _remote_mss = 536; | |
138 | uint16_t _local_mss; | |
139 | uint8_t _remote_win_scale = 0; | |
140 | uint8_t _local_win_scale = 0; | |
141 | }; | |
142 | inline uint8_t*& operator+=(uint8_t*& x, tcp_option::option_len len) { x += uint8_t(len); return x; } | |
143 | inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len); return x; } | |
144 | ||
145 | struct tcp_sequence { | |
146 | uint32_t raw; | |
147 | }; | |
148 | ||
149 | tcp_sequence ntoh(tcp_sequence ts) { | |
150 | return tcp_sequence { ::ntoh(ts.raw) }; | |
151 | } | |
152 | ||
153 | tcp_sequence hton(tcp_sequence ts) { | |
154 | return tcp_sequence { ::hton(ts.raw) }; | |
155 | } | |
156 | ||
31f18b77 | 157 | inline std::ostream& operator<<(std::ostream& os, const tcp_sequence& s) { |
7c673cae FG |
158 | return os << s.raw; |
159 | } | |
160 | ||
161 | inline tcp_sequence make_seq(uint32_t raw) { return tcp_sequence{raw}; } | |
162 | inline tcp_sequence& operator+=(tcp_sequence& s, int32_t n) { s.raw += n; return s; } | |
163 | inline tcp_sequence& operator-=(tcp_sequence& s, int32_t n) { s.raw -= n; return s; } | |
164 | inline tcp_sequence operator+(tcp_sequence s, int32_t n) { return s += n; } | |
165 | inline tcp_sequence operator-(tcp_sequence s, int32_t n) { return s -= n; } | |
166 | inline int32_t operator-(tcp_sequence s, tcp_sequence q) { return s.raw - q.raw; } | |
167 | inline bool operator==(tcp_sequence s, tcp_sequence q) { return s.raw == q.raw; } | |
168 | inline bool operator!=(tcp_sequence s, tcp_sequence q) { return !(s == q); } | |
169 | inline bool operator<(tcp_sequence s, tcp_sequence q) { return s - q < 0; } | |
170 | inline bool operator>(tcp_sequence s, tcp_sequence q) { return q < s; } | |
171 | inline bool operator<=(tcp_sequence s, tcp_sequence q) { return !(s > q); } | |
172 | inline bool operator>=(tcp_sequence s, tcp_sequence q) { return !(s < q); } | |
173 | ||
174 | struct tcp_hdr { | |
175 | uint16_t src_port; | |
176 | uint16_t dst_port; | |
177 | tcp_sequence seq; | |
178 | tcp_sequence ack; | |
179 | uint8_t rsvd1 : 4; | |
180 | uint8_t data_offset : 4; | |
181 | uint8_t f_fin : 1; | |
182 | uint8_t f_syn : 1; | |
183 | uint8_t f_rst : 1; | |
184 | uint8_t f_psh : 1; | |
185 | uint8_t f_ack : 1; | |
186 | uint8_t f_urg : 1; | |
187 | uint8_t rsvd2 : 2; | |
188 | uint16_t window; | |
189 | uint16_t checksum; | |
190 | uint16_t urgent; | |
191 | ||
192 | tcp_hdr hton() { | |
193 | tcp_hdr hdr = *this; | |
194 | hdr.src_port = ::hton(src_port); | |
195 | hdr.dst_port = ::hton(dst_port); | |
196 | hdr.seq = ::hton(seq); | |
197 | hdr.ack = ::hton(ack); | |
198 | hdr.window = ::hton(window); | |
199 | hdr.checksum = ::hton(checksum); | |
200 | hdr.urgent = ::hton(urgent); | |
201 | return hdr; | |
202 | } | |
203 | ||
204 | tcp_hdr ntoh() { | |
205 | tcp_hdr hdr = *this; | |
206 | hdr.src_port = ::ntoh(src_port); | |
207 | hdr.dst_port = ::ntoh(dst_port); | |
208 | hdr.seq = ::ntoh(seq); | |
209 | hdr.ack = ::ntoh(ack); | |
210 | hdr.window = ::ntoh(window); | |
211 | hdr.checksum = ::ntoh(checksum); | |
212 | hdr.urgent = ::ntoh(urgent); | |
213 | return hdr; | |
214 | } | |
215 | } __attribute__((packed)); | |
216 | ||
217 | struct tcp_tag {}; | |
218 | using tcp_packet_merger = packet_merger<tcp_sequence, tcp_tag>; | |
219 | ||
220 | template <typename InetTraits> | |
221 | class tcp { | |
222 | public: | |
223 | using ipaddr = typename InetTraits::address_type; | |
224 | using inet_type = typename InetTraits::inet_type; | |
225 | using connid = l4connid<InetTraits>; | |
226 | using connid_hash = typename connid::connid_hash; | |
227 | class connection; | |
228 | class listener; | |
229 | private: | |
230 | class tcb; | |
231 | ||
232 | class C_handle_delayed_ack : public EventCallback { | |
233 | tcb *tc; | |
234 | ||
235 | public: | |
236 | C_handle_delayed_ack(tcb *t): tc(t) { } | |
237 | void do_request(int r) { | |
238 | tc->_nr_full_seg_received = 0; | |
239 | tc->output(); | |
240 | } | |
241 | }; | |
242 | ||
243 | class C_handle_retransmit : public EventCallback { | |
244 | tcb *tc; | |
245 | ||
246 | public: | |
247 | C_handle_retransmit(tcb *t): tc(t) { } | |
248 | void do_request(int r) { | |
249 | tc->retransmit(); | |
250 | } | |
251 | }; | |
252 | ||
253 | class C_handle_persist : public EventCallback { | |
254 | tcb *tc; | |
255 | ||
256 | public: | |
257 | C_handle_persist(tcb *t): tc(t) { } | |
258 | void do_request(int r) { | |
259 | tc->persist(); | |
260 | } | |
261 | }; | |
262 | ||
263 | class C_all_data_acked : public EventCallback { | |
264 | tcb *tc; | |
265 | ||
266 | public: | |
267 | C_all_data_acked(tcb *t): tc(t) {} | |
268 | void do_request(int fd_or_id) { | |
269 | tc->close_final_cleanup(); | |
270 | } | |
271 | }; | |
272 | ||
273 | class C_actual_remove_tcb : public EventCallback { | |
274 | lw_shared_ptr<tcb> tc; | |
275 | public: | |
276 | C_actual_remove_tcb(tcb *t): tc(t->shared_from_this()) {} | |
277 | void do_request(int r) { | |
278 | delete this; | |
279 | } | |
280 | }; | |
281 | ||
282 | class tcb : public enable_lw_shared_from_this<tcb> { | |
283 | using clock_type = ceph::coarse_real_clock; | |
284 | static constexpr tcp_state CLOSED = tcp_state::CLOSED; | |
285 | static constexpr tcp_state LISTEN = tcp_state::LISTEN; | |
286 | static constexpr tcp_state SYN_SENT = tcp_state::SYN_SENT; | |
287 | static constexpr tcp_state SYN_RECEIVED = tcp_state::SYN_RECEIVED; | |
288 | static constexpr tcp_state ESTABLISHED = tcp_state::ESTABLISHED; | |
289 | static constexpr tcp_state FIN_WAIT_1 = tcp_state::FIN_WAIT_1; | |
290 | static constexpr tcp_state FIN_WAIT_2 = tcp_state::FIN_WAIT_2; | |
291 | static constexpr tcp_state CLOSE_WAIT = tcp_state::CLOSE_WAIT; | |
292 | static constexpr tcp_state CLOSING = tcp_state::CLOSING; | |
293 | static constexpr tcp_state LAST_ACK = tcp_state::LAST_ACK; | |
294 | static constexpr tcp_state TIME_WAIT = tcp_state::TIME_WAIT; | |
295 | tcp_state _state = CLOSED; | |
296 | tcp& _tcp; | |
297 | UserspaceEventManager &manager; | |
298 | connection* _conn = nullptr; | |
299 | bool _connect_done = false; | |
300 | ipaddr _local_ip; | |
301 | ipaddr _foreign_ip; | |
302 | uint16_t _local_port; | |
303 | uint16_t _foreign_port; | |
304 | struct unacked_segment { | |
305 | Packet p; | |
306 | uint16_t data_len; | |
307 | unsigned nr_transmits; | |
308 | clock_type::time_point tx_time; | |
309 | }; | |
310 | struct send { | |
311 | tcp_sequence unacknowledged; | |
312 | tcp_sequence next; | |
313 | uint32_t window; | |
314 | uint8_t window_scale; | |
315 | uint16_t mss; | |
316 | tcp_sequence urgent; | |
317 | tcp_sequence wl1; | |
318 | tcp_sequence wl2; | |
319 | tcp_sequence initial; | |
320 | std::deque<unacked_segment> data; | |
321 | std::deque<Packet> unsent; | |
322 | uint32_t unsent_len = 0; | |
323 | uint32_t queued_len = 0; | |
324 | bool closed = false; | |
325 | // Wait for all data are acked | |
326 | int _all_data_acked_fd = -1; | |
327 | // Limit number of data queued into send queue | |
328 | Throttle user_queue_space; | |
329 | // Round-trip time variation | |
330 | std::chrono::microseconds rttvar; | |
331 | // Smoothed round-trip time | |
332 | std::chrono::microseconds srtt; | |
333 | bool first_rto_sample = true; | |
334 | clock_type::time_point syn_tx_time; | |
335 | // Congestion window | |
336 | uint32_t cwnd; | |
337 | // Slow start threshold | |
338 | uint32_t ssthresh; | |
339 | // Duplicated ACKs | |
340 | uint16_t dupacks = 0; | |
341 | unsigned syn_retransmit = 0; | |
342 | unsigned fin_retransmit = 0; | |
343 | uint32_t limited_transfer = 0; | |
344 | uint32_t partial_ack = 0; | |
345 | tcp_sequence recover; | |
346 | bool window_probe = false; | |
347 | send(CephContext *c): user_queue_space(c, "DPDK::tcp::tcb::user_queue_space", 81920) {} | |
348 | } _snd; | |
349 | struct receive { | |
350 | tcp_sequence next; | |
351 | uint32_t window; | |
352 | uint8_t window_scale; | |
353 | uint16_t mss; | |
354 | tcp_sequence urgent; | |
355 | tcp_sequence initial; | |
356 | std::deque<Packet> data; | |
357 | tcp_packet_merger out_of_order; | |
358 | } _rcv; | |
359 | EventCenter *center; | |
360 | int fd; | |
361 | // positive means no errno, 0 means eof, nagetive means error | |
362 | int16_t _errno = 1; | |
363 | tcp_option _option; | |
364 | EventCallbackRef delayed_ack_event; | |
365 | Tub<uint64_t> _delayed_ack_fd; | |
366 | // Retransmission timeout | |
367 | std::chrono::microseconds _rto{1000*1000}; | |
368 | std::chrono::microseconds _persist_time_out{1000*1000}; | |
369 | static constexpr std::chrono::microseconds _rto_min{1000*1000}; | |
370 | static constexpr std::chrono::microseconds _rto_max{60000*1000}; | |
371 | // Clock granularity | |
372 | static constexpr std::chrono::microseconds _rto_clk_granularity{1000}; | |
373 | static constexpr uint16_t _max_nr_retransmit{5}; | |
374 | EventCallbackRef retransmit_event; | |
375 | Tub<uint64_t> retransmit_fd; | |
376 | EventCallbackRef persist_event; | |
377 | EventCallbackRef all_data_ack_event; | |
378 | Tub<uint64_t> persist_fd; | |
379 | uint16_t _nr_full_seg_received = 0; | |
380 | struct isn_secret { | |
381 | // 512 bits secretkey for ISN generating | |
382 | uint32_t key[16]; | |
383 | isn_secret () { | |
384 | std::random_device rd; | |
385 | std::default_random_engine e(rd()); | |
386 | std::uniform_int_distribution<uint32_t> dist{}; | |
387 | for (auto& k : key) { | |
388 | k = dist(e); | |
389 | } | |
390 | } | |
391 | }; | |
392 | static isn_secret _isn_secret; | |
393 | tcp_sequence get_isn(); | |
394 | circular_buffer<typename InetTraits::l4packet> _packetq; | |
395 | bool _poll_active = false; | |
396 | public: | |
397 | // callback | |
398 | void close_final_cleanup(); | |
399 | ostream& _prefix(std::ostream *_dout); | |
400 | ||
401 | public: | |
402 | tcb(tcp& t, connid id); | |
403 | ~tcb(); | |
404 | void input_handle_listen_state(tcp_hdr* th, Packet p); | |
405 | void input_handle_syn_sent_state(tcp_hdr* th, Packet p); | |
406 | void input_handle_other_state(tcp_hdr* th, Packet p); | |
407 | void output_one(bool data_retransmit = false); | |
408 | bool is_all_data_acked(); | |
409 | int send(Packet p); | |
410 | void connect(); | |
411 | Tub<Packet> read(); | |
412 | void close(); | |
413 | void remove_from_tcbs() { | |
414 | auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port}; | |
415 | _tcp._tcbs.erase(id); | |
416 | } | |
417 | Tub<typename InetTraits::l4packet> get_packet(); | |
418 | void output() { | |
419 | if (!_poll_active) { | |
420 | _poll_active = true; | |
421 | ||
422 | auto tcb = this->shared_from_this(); | |
423 | _tcp._inet.wait_l2_dst_address(_foreign_ip, Packet(), [tcb] (const ethernet_address &dst, Packet p, int r) { | |
424 | if (r == 0) { | |
425 | tcb->_tcp.poll_tcb(dst, std::move(tcb)); | |
426 | } else if (r == -ETIMEDOUT) { | |
427 | // in other states connection should time out | |
428 | if (tcb->in_state(SYN_SENT)) { | |
429 | tcb->_errno = -ETIMEDOUT; | |
430 | tcb->cleanup(); | |
431 | } | |
432 | } else if (r == -EBUSY) { | |
433 | // retry later | |
434 | tcb->_poll_active = false; | |
435 | tcb->start_retransmit_timer(); | |
436 | } | |
437 | }); | |
438 | } | |
439 | } | |
440 | ||
441 | int16_t get_errno() const { | |
442 | return _errno; | |
443 | } | |
444 | ||
445 | tcp_state& state() { | |
446 | return _state; | |
447 | } | |
448 | ||
449 | uint64_t peek_sent_available() { | |
450 | if (!in_state(ESTABLISHED)) | |
451 | return 0; | |
452 | uint64_t left = _snd.user_queue_space.get_max() - _snd.user_queue_space.get_current(); | |
453 | return left; | |
454 | } | |
455 | ||
456 | int is_connected() const { | |
457 | if (_errno <= 0) | |
458 | return _errno; | |
459 | return _connect_done; | |
460 | } | |
461 | ||
462 | private: | |
463 | void respond_with_reset(tcp_hdr* th); | |
464 | bool merge_out_of_order(); | |
465 | void insert_out_of_order(tcp_sequence seq, Packet p); | |
466 | void trim_receive_data_after_window(); | |
467 | bool should_send_ack(uint16_t seg_len); | |
468 | void clear_delayed_ack(); | |
469 | Packet get_transmit_packet(); | |
470 | void retransmit_one() { | |
471 | bool data_retransmit = true; | |
472 | output_one(data_retransmit); | |
473 | } | |
474 | void start_retransmit_timer() { | |
475 | if (retransmit_fd) | |
476 | center->delete_time_event(*retransmit_fd); | |
477 | retransmit_fd.construct(center->create_time_event(_rto.count(), retransmit_event)); | |
478 | }; | |
479 | void stop_retransmit_timer() { | |
480 | if (retransmit_fd) { | |
481 | center->delete_time_event(*retransmit_fd); | |
482 | retransmit_fd.destroy(); | |
483 | } | |
484 | }; | |
485 | void start_persist_timer() { | |
486 | if (persist_fd) | |
487 | center->delete_time_event(*persist_fd); | |
488 | persist_fd.construct(center->create_time_event(_persist_time_out.count(), persist_event)); | |
489 | }; | |
490 | void stop_persist_timer() { | |
491 | if (persist_fd) { | |
492 | center->delete_time_event(*persist_fd); | |
493 | persist_fd.destroy(); | |
494 | } | |
495 | }; | |
496 | void persist(); | |
497 | void retransmit(); | |
498 | void fast_retransmit(); | |
499 | void update_rto(clock_type::time_point tx_time); | |
500 | void update_cwnd(uint32_t acked_bytes); | |
501 | void cleanup(); | |
502 | uint32_t can_send() { | |
503 | if (_snd.window_probe) { | |
504 | return 1; | |
505 | } | |
506 | // Can not send more than advertised window allows | |
507 | auto x = std::min(uint32_t(_snd.unacknowledged + _snd.window - _snd.next), _snd.unsent_len); | |
508 | // Can not send more than congestion window allows | |
509 | x = std::min(_snd.cwnd, x); | |
510 | if (_snd.dupacks == 1 || _snd.dupacks == 2) { | |
511 | // RFC5681 Step 3.1 | |
512 | // Send cwnd + 2 * smss per RFC3042 | |
513 | auto flight = flight_size(); | |
514 | auto max = _snd.cwnd + 2 * _snd.mss; | |
515 | x = flight <= max ? std::min(x, max - flight) : 0; | |
516 | _snd.limited_transfer += x; | |
517 | } else if (_snd.dupacks >= 3) { | |
518 | // RFC5681 Step 3.5 | |
519 | // Sent 1 full-sized segment at most | |
520 | x = std::min(uint32_t(_snd.mss), x); | |
521 | } | |
522 | return x; | |
523 | } | |
524 | uint32_t flight_size() { | |
525 | uint32_t size = 0; | |
526 | std::for_each(_snd.data.begin(), _snd.data.end(), | |
527 | [&] (unacked_segment& seg) { size += seg.p.len(); }); | |
528 | return size; | |
529 | } | |
530 | uint16_t local_mss() { | |
531 | return _tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min; | |
532 | } | |
533 | void queue_packet(Packet p) { | |
534 | _packetq.emplace_back( | |
535 | typename InetTraits::l4packet{_foreign_ip, std::move(p)}); | |
536 | } | |
537 | void signal_data_received() { | |
538 | manager.notify(fd, EVENT_READABLE); | |
539 | } | |
540 | void signal_all_data_acked() { | |
541 | if (_snd._all_data_acked_fd >= 0 && _snd.unsent_len == 0 && _snd.queued_len == 0) | |
542 | manager.notify(_snd._all_data_acked_fd, EVENT_READABLE); | |
543 | } | |
544 | void do_syn_sent() { | |
545 | _state = SYN_SENT; | |
546 | _snd.syn_tx_time = clock_type::now(); | |
547 | // Send <SYN> to remote | |
548 | output(); | |
549 | } | |
550 | void do_syn_received() { | |
551 | _state = SYN_RECEIVED; | |
552 | _snd.syn_tx_time = clock_type::now(); | |
553 | // Send <SYN,ACK> to remote | |
554 | output(); | |
555 | } | |
556 | void do_established() { | |
557 | _state = ESTABLISHED; | |
558 | update_rto(_snd.syn_tx_time); | |
559 | _connect_done = true; | |
560 | manager.notify(fd, EVENT_READABLE|EVENT_WRITABLE); | |
561 | } | |
562 | void do_reset() { | |
563 | _state = CLOSED; | |
564 | // Free packets to be sent which are waiting for user_queue_space | |
565 | _snd.user_queue_space.reset(); | |
566 | cleanup(); | |
567 | _errno = -ECONNRESET; | |
568 | manager.notify(fd, EVENT_READABLE); | |
569 | ||
570 | if (_snd._all_data_acked_fd >= 0) | |
571 | manager.notify(_snd._all_data_acked_fd, EVENT_READABLE); | |
572 | } | |
573 | void do_time_wait() { | |
574 | // FIXME: Implement TIME_WAIT state timer | |
575 | _state = TIME_WAIT; | |
576 | cleanup(); | |
577 | } | |
578 | void do_closed() { | |
579 | _state = CLOSED; | |
580 | cleanup(); | |
581 | } | |
582 | void do_setup_isn() { | |
583 | _snd.initial = get_isn(); | |
584 | _snd.unacknowledged = _snd.initial; | |
585 | _snd.next = _snd.initial + 1; | |
586 | _snd.recover = _snd.initial; | |
587 | } | |
588 | void do_local_fin_acked() { | |
589 | _snd.unacknowledged += 1; | |
590 | _snd.next += 1; | |
591 | } | |
592 | bool syn_needs_on() { | |
593 | return in_state(SYN_SENT | SYN_RECEIVED); | |
594 | } | |
595 | bool fin_needs_on() { | |
596 | return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed && | |
597 | _snd.unsent_len == 0 && _snd.queued_len == 0; | |
598 | } | |
599 | bool ack_needs_on() { | |
600 | return !in_state(CLOSED | LISTEN | SYN_SENT); | |
601 | } | |
602 | bool foreign_will_not_send() { | |
603 | return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED); | |
604 | } | |
605 | bool in_state(tcp_state state) { | |
606 | return uint16_t(_state) & uint16_t(state); | |
607 | } | |
608 | void exit_fast_recovery() { | |
609 | _snd.dupacks = 0; | |
610 | _snd.limited_transfer = 0; | |
611 | _snd.partial_ack = 0; | |
612 | } | |
613 | uint32_t data_segment_acked(tcp_sequence seg_ack); | |
614 | bool segment_acceptable(tcp_sequence seg_seq, unsigned seg_len); | |
615 | void init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end); | |
616 | friend class connection; | |
617 | ||
618 | friend class C_handle_delayed_ack; | |
619 | friend class C_handle_retransmit; | |
620 | friend class C_handle_persist; | |
621 | friend class C_all_data_acked; | |
622 | }; | |
623 | ||
624 | CephContext *cct; | |
625 | // ipv4_l4<ip_protocol_num::tcp> | |
626 | inet_type& _inet; | |
627 | EventCenter *center; | |
628 | UserspaceEventManager &manager; | |
629 | std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs; | |
630 | std::unordered_map<uint16_t, listener*> _listening; | |
631 | std::random_device _rd; | |
632 | std::default_random_engine _e; | |
633 | std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535}; | |
634 | circular_buffer<std::pair<lw_shared_ptr<tcb>, ethernet_address>> _poll_tcbs; | |
635 | // queue for packets that do not belong to any tcb | |
636 | circular_buffer<ipv4_traits::l4packet> _packetq; | |
637 | Throttle _queue_space; | |
638 | // Limit number of data queued into send queue | |
639 | public: | |
640 | class connection { | |
641 | lw_shared_ptr<tcb> _tcb; | |
642 | public: | |
643 | explicit connection(lw_shared_ptr<tcb> tcbp) : _tcb(std::move(tcbp)) { _tcb->_conn = this; } | |
644 | connection(const connection&) = delete; | |
645 | connection(connection&& x) noexcept : _tcb(std::move(x._tcb)) { | |
646 | _tcb->_conn = this; | |
647 | } | |
648 | ~connection(); | |
649 | void operator=(const connection&) = delete; | |
650 | connection& operator=(connection&& x) { | |
651 | if (this != &x) { | |
652 | this->~connection(); | |
653 | new (this) connection(std::move(x)); | |
654 | } | |
655 | return *this; | |
656 | } | |
657 | int fd() const { | |
658 | return _tcb->fd; | |
659 | } | |
660 | int send(Packet p) { | |
661 | return _tcb->send(std::move(p)); | |
662 | } | |
663 | Tub<Packet> read() { | |
664 | return _tcb->read(); | |
665 | } | |
666 | int16_t get_errno() const { | |
667 | return _tcb->get_errno(); | |
668 | } | |
669 | void close_read(); | |
670 | void close_write(); | |
671 | entity_addr_t remote_addr() const { | |
672 | entity_addr_t addr; | |
673 | auto net_ip = _tcb->_foreign_ip.hton(); | |
674 | memcpy((void*)&addr.in4_addr().sin_addr.s_addr, | |
675 | &net_ip, sizeof(addr.in4_addr().sin_addr.s_addr)); | |
676 | addr.set_family(AF_INET); | |
677 | return addr; | |
678 | } | |
679 | uint64_t peek_sent_available() { | |
680 | return _tcb->peek_sent_available(); | |
681 | } | |
682 | int is_connected() const { return _tcb->is_connected(); } | |
683 | }; | |
684 | class listener { | |
685 | tcp& _tcp; | |
686 | uint16_t _port; | |
687 | int _fd = -1; | |
688 | int16_t _errno; | |
689 | queue<connection> _q; | |
690 | size_t _q_max_length; | |
691 | ||
692 | private: | |
693 | listener(tcp& t, uint16_t port, size_t queue_length) | |
694 | : _tcp(t), _port(port), _errno(0), _q(), _q_max_length(queue_length) { | |
695 | } | |
696 | public: | |
697 | listener(const listener&) = delete; | |
698 | void operator=(const listener&) = delete; | |
699 | listener(listener&& x) | |
700 | : _tcp(x._tcp), _port(x._port), _fd(std::move(x._fd)), _errno(x._errno), | |
701 | _q(std::move(x._q)) { | |
702 | if (_fd >= 0) | |
703 | _tcp._listening[_port] = this; | |
704 | } | |
705 | ~listener() { | |
706 | abort_accept(); | |
707 | } | |
708 | int listen() { | |
709 | if (_tcp._listening.find(_port) != _tcp._listening.end()) | |
710 | return -EADDRINUSE; | |
711 | _tcp._listening.emplace(_port, this); | |
712 | _fd = _tcp.manager.get_eventfd(); | |
713 | return 0; | |
714 | } | |
715 | Tub<connection> accept() { | |
716 | Tub<connection> c; | |
717 | if (!_q.empty()) { | |
718 | c = std::move(_q.front()); | |
719 | _q.pop(); | |
720 | } | |
721 | return c; | |
722 | } | |
723 | void abort_accept() { | |
724 | while (!_q.empty()) | |
725 | _q.pop(); | |
726 | if (_fd >= 0) { | |
727 | _tcp._listening.erase(_port); | |
728 | _tcp.manager.close(_fd); | |
729 | _fd = -1; | |
730 | } | |
731 | } | |
732 | int16_t get_errno() const { | |
733 | return _errno; | |
734 | } | |
735 | bool full() const { | |
736 | return _q.size() == _q_max_length; | |
737 | } | |
738 | int fd() const { | |
739 | return _fd; | |
740 | } | |
741 | friend class tcp; | |
742 | }; | |
743 | public: | |
744 | explicit tcp(CephContext *c, inet_type& inet, EventCenter *cen); | |
745 | void received(Packet p, ipaddr from, ipaddr to); | |
746 | bool forward(forward_hash& out_hash_data, Packet& p, size_t off); | |
747 | listener listen(uint16_t port, size_t queue_length = 100); | |
748 | connection connect(const entity_addr_t &addr); | |
749 | const hw_features& get_hw_features() const { return _inet._inet.get_hw_features(); } | |
750 | void poll_tcb(const ethernet_address &dst, lw_shared_ptr<tcb> tcb) { | |
751 | _poll_tcbs.emplace_back(std::move(tcb), dst); | |
752 | } | |
753 | bool push_listen_queue(uint16_t port, tcb *t) { | |
754 | auto listener = _listening.find(port); | |
755 | if (listener == _listening.end() || listener->second->full()) { | |
756 | return false; | |
757 | } | |
758 | listener->second->_q.push(connection(t->shared_from_this())); | |
759 | manager.notify(listener->second->_fd, EVENT_READABLE); | |
760 | return true; | |
761 | } | |
762 | ||
763 | private: | |
764 | void send_packet_without_tcb(ipaddr from, ipaddr to, Packet p); | |
765 | void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip); | |
766 | friend class listener; | |
767 | }; | |
768 | ||
769 | template <typename InetTraits> | |
770 | tcp<InetTraits>::tcp(CephContext *c, inet_type& inet, EventCenter *cen) | |
771 | : cct(c), _inet(inet), center(cen), | |
772 | manager(static_cast<DPDKDriver*>(cen->get_driver())->manager), | |
773 | _e(_rd()), _queue_space(cct, "DPDK::tcp::queue_space", 81920) { | |
774 | int tcb_polled = 0u; | |
775 | _inet.register_packet_provider([this, tcb_polled] () mutable { | |
776 | Tub<typename InetTraits::l4packet> l4p; | |
777 | auto c = _poll_tcbs.size(); | |
778 | if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) { | |
779 | l4p = std::move(_packetq.front()); | |
780 | _packetq.pop_front(); | |
781 | _queue_space.put(l4p->p.len()); | |
782 | } else { | |
783 | while (c--) { | |
784 | tcb_polled++; | |
785 | lw_shared_ptr<tcb> tcb; | |
786 | ethernet_address dst; | |
787 | std::tie(tcb, dst) = std::move(_poll_tcbs.front()); | |
788 | _poll_tcbs.pop_front(); | |
789 | l4p = std::move(tcb->get_packet()); | |
790 | if (l4p) { | |
791 | l4p->e_dst = dst; | |
792 | break; | |
793 | } | |
794 | } | |
795 | } | |
796 | return l4p; | |
797 | }); | |
798 | } | |
799 | ||
800 | template <typename InetTraits> | |
801 | auto tcp<InetTraits>::listen(uint16_t port, size_t queue_length) -> listener { | |
802 | return listener(*this, port, queue_length); | |
803 | } | |
804 | ||
805 | template <typename InetTraits> | |
806 | typename tcp<InetTraits>::connection tcp<InetTraits>::connect(const entity_addr_t &addr) { | |
807 | uint16_t src_port; | |
808 | connid id; | |
809 | auto src_ip = _inet._inet.host_address(); | |
810 | auto dst_ip = ipv4_address(addr); | |
811 | auto dst_port = addr.get_port(); | |
812 | ||
813 | do { | |
814 | src_port = _port_dist(_e); | |
815 | id = connid{src_ip, dst_ip, src_port, (uint16_t)dst_port}; | |
816 | if (_tcbs.find(id) == _tcbs.end()) { | |
817 | if (_inet._inet.netif()->hw_queues_count() == 1 || | |
818 | _inet._inet.netif()->hash2cpu( | |
819 | id.hash(_inet._inet.netif()->rss_key())) == center->get_id()) | |
820 | break; | |
821 | } | |
822 | } while (true); | |
823 | ||
824 | auto tcbp = make_lw_shared<tcb>(*this, id); | |
825 | _tcbs.insert({id, tcbp}); | |
826 | tcbp->connect(); | |
827 | return connection(tcbp); | |
828 | } | |
829 | ||
830 | template <typename InetTraits> | |
831 | bool tcp<InetTraits>::forward(forward_hash& out_hash_data, Packet& p, size_t off) { | |
832 | auto th = p.get_header<tcp_hdr>(off); | |
833 | if (th) { | |
834 | out_hash_data.push_back(th->src_port); | |
835 | out_hash_data.push_back(th->dst_port); | |
836 | } | |
837 | return true; | |
838 | } | |
839 | ||
840 | template <typename InetTraits> | |
841 | void tcp<InetTraits>::received(Packet p, ipaddr from, ipaddr to) { | |
842 | auto th = p.get_header<tcp_hdr>(0); | |
843 | if (!th) { | |
844 | return; | |
845 | } | |
846 | // th->data_offset is correct even before ntoh() | |
847 | if (unsigned(th->data_offset * 4) < sizeof(*th)) { | |
848 | return; | |
849 | } | |
850 | ||
851 | if (!get_hw_features().rx_csum_offload) { | |
852 | checksummer csum; | |
853 | InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len()); | |
854 | csum.sum(p); | |
855 | if (csum.get() != 0) { | |
856 | return; | |
857 | } | |
858 | } | |
859 | auto h = th->ntoh(); | |
860 | auto id = connid{to, from, h.dst_port, h.src_port}; | |
861 | auto tcbi = _tcbs.find(id); | |
862 | lw_shared_ptr<tcb> tcbp; | |
863 | if (tcbi == _tcbs.end()) { | |
864 | auto listener = _listening.find(id.local_port); | |
865 | if (listener == _listening.end() || listener->second->full()) { | |
866 | // 1) In CLOSE state | |
867 | // 1.1 all data in the incoming segment is discarded. An incoming | |
868 | // segment containing a RST is discarded. An incoming segment not | |
869 | // containing a RST causes a RST to be sent in response. | |
870 | // FIXME: | |
871 | // if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK> | |
872 | // if ACK on: <SEQ=SEG.ACK><CTL=RST> | |
873 | return respond_with_reset(&h, id.local_ip, id.foreign_ip); | |
874 | } else { | |
875 | // 2) In LISTEN state | |
876 | // 2.1 first check for an RST | |
877 | if (h.f_rst) { | |
878 | // An incoming RST should be ignored | |
879 | return; | |
880 | } | |
881 | // 2.2 second check for an ACK | |
882 | if (h.f_ack) { | |
883 | // Any acknowledgment is bad if it arrives on a connection | |
884 | // still in the LISTEN state. | |
885 | // <SEQ=SEG.ACK><CTL=RST> | |
886 | return respond_with_reset(&h, id.local_ip, id.foreign_ip); | |
887 | } | |
888 | // 2.3 third check for a SYN | |
889 | if (h.f_syn) { | |
890 | // check the security | |
891 | // NOTE: Ignored for now | |
892 | tcbp = make_lw_shared<tcb>(*this, id); | |
893 | _tcbs.insert({id, tcbp}); | |
894 | return tcbp->input_handle_listen_state(&h, std::move(p)); | |
895 | } | |
896 | // 2.4 fourth other text or control | |
897 | // So you are unlikely to get here, but if you do, drop the | |
898 | // segment, and return. | |
899 | return; | |
900 | } | |
901 | } else { | |
902 | tcbp = tcbi->second; | |
903 | if (tcbp->state() == tcp_state::SYN_SENT) { | |
904 | // 3) In SYN_SENT State | |
905 | return tcbp->input_handle_syn_sent_state(&h, std::move(p)); | |
906 | } else { | |
907 | // 4) In other state, can be one of the following: | |
908 | // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2 | |
909 | // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT | |
910 | return tcbp->input_handle_other_state(&h, std::move(p)); | |
911 | } | |
912 | } | |
913 | } | |
914 | ||
915 | // Send packet does not belong to any tcb | |
916 | template <typename InetTraits> | |
917 | void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, Packet p) { | |
918 | if (_queue_space.get_or_fail(p.len())) { // drop packets that do not fit the queue | |
919 | _inet.wait_l2_dst_address(to, std::move(p), [this, to] (const ethernet_address &e_dst, Packet p, int r) mutable { | |
920 | if (r == 0) | |
921 | _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp}); | |
922 | }); | |
923 | } | |
924 | } | |
925 | ||
926 | template <typename InetTraits> | |
927 | tcp<InetTraits>::connection::~connection() { | |
928 | if (_tcb) { | |
929 | _tcb->_conn = nullptr; | |
930 | close_read(); | |
931 | close_write(); | |
932 | } | |
933 | } | |
934 | ||
935 | template <typename InetTraits> | |
936 | tcp<InetTraits>::tcb::tcb(tcp& t, connid id) | |
937 | : _tcp(t), manager(t.manager), _local_ip(id.local_ip) , _foreign_ip(id.foreign_ip), | |
938 | _local_port(id.local_port), _foreign_port(id.foreign_port), | |
939 | _snd(_tcp.cct), | |
940 | center(t.center), | |
941 | fd(t.manager.get_eventfd()), | |
942 | delayed_ack_event(new tcp<InetTraits>::C_handle_delayed_ack(this)), | |
943 | retransmit_event(new tcp<InetTraits>::C_handle_retransmit(this)), | |
944 | persist_event(new tcp<InetTraits>::C_handle_persist(this)), | |
945 | all_data_ack_event(new tcp<InetTraits>::C_all_data_acked(this)) {} | |
946 | ||
947 | template <typename InetTraits> | |
948 | tcp<InetTraits>::tcb::~tcb() | |
949 | { | |
950 | if (_delayed_ack_fd) | |
951 | center->delete_time_event(*_delayed_ack_fd); | |
952 | if (retransmit_fd) | |
953 | center->delete_time_event(*retransmit_fd); | |
954 | if (persist_fd) | |
955 | center->delete_time_event(*persist_fd); | |
956 | delete delayed_ack_event; | |
957 | delete retransmit_event; | |
958 | delete persist_event; | |
959 | delete all_data_ack_event; | |
960 | manager.close(fd); | |
961 | fd = -1; | |
962 | } | |
963 | ||
964 | template <typename InetTraits> | |
965 | void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth) | |
966 | { | |
967 | _tcp.respond_with_reset(rth, _local_ip, _foreign_ip); | |
968 | } | |
969 | ||
970 | template <typename InetTraits> | |
971 | uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_sequence seg_ack) { | |
972 | uint32_t total_acked_bytes = 0; | |
973 | // Full ACK of segment | |
974 | while (!_snd.data.empty() | |
975 | && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) { | |
976 | auto acked_bytes = _snd.data.front().p.len(); | |
977 | _snd.unacknowledged += acked_bytes; | |
978 | // Ignore retransmitted segments when setting the RTO | |
979 | if (_snd.data.front().nr_transmits == 0) { | |
980 | update_rto(_snd.data.front().tx_time); | |
981 | } | |
982 | update_cwnd(acked_bytes); | |
983 | total_acked_bytes += acked_bytes; | |
984 | _snd.user_queue_space.put(_snd.data.front().data_len); | |
985 | manager.notify(fd, EVENT_WRITABLE); | |
986 | _snd.data.pop_front(); | |
987 | } | |
988 | // Partial ACK of segment | |
989 | if (_snd.unacknowledged < seg_ack) { | |
990 | auto acked_bytes = seg_ack - _snd.unacknowledged; | |
991 | if (!_snd.data.empty()) { | |
992 | auto& unacked_seg = _snd.data.front(); | |
993 | unacked_seg.p.trim_front(acked_bytes); | |
994 | } | |
995 | _snd.unacknowledged = seg_ack; | |
996 | update_cwnd(acked_bytes); | |
997 | total_acked_bytes += acked_bytes; | |
998 | } | |
999 | return total_acked_bytes; | |
1000 | } | |
1001 | ||
1002 | template <typename InetTraits> | |
1003 | bool tcp<InetTraits>::tcb::segment_acceptable(tcp_sequence seg_seq, unsigned seg_len) { | |
1004 | if (seg_len == 0 && _rcv.window == 0) { | |
1005 | // SEG.SEQ = RCV.NXT | |
1006 | return seg_seq == _rcv.next; | |
1007 | } else if (seg_len == 0 && _rcv.window > 0) { | |
1008 | // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND | |
1009 | return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window); | |
1010 | } else if (seg_len > 0 && _rcv.window > 0) { | |
1011 | // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND | |
1012 | // or | |
1013 | // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND | |
1014 | bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window); | |
1015 | bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window); | |
1016 | return x || y; | |
1017 | } else { | |
1018 | // SEG.LEN > 0 RCV.WND = 0, not acceptable | |
1019 | return false; | |
1020 | } | |
1021 | } | |
1022 | ||
1023 | template <typename InetTraits> | |
1024 | void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) { | |
1025 | // Handle tcp options | |
1026 | _option.parse(opt_start, opt_end); | |
1027 | ||
1028 | // Remote receive window scale factor | |
1029 | _snd.window_scale = _option._remote_win_scale; | |
1030 | // Local receive window scale factor | |
1031 | _rcv.window_scale = _option._local_win_scale; | |
1032 | ||
1033 | // Maximum segment size remote can receive | |
1034 | _snd.mss = _option._remote_mss; | |
1035 | // Maximum segment size local can receive | |
1036 | _rcv.mss = _option._local_mss = local_mss(); | |
1037 | ||
1038 | // Linux's default window size | |
1039 | _rcv.window = 29200 << _rcv.window_scale; | |
1040 | _snd.window = th->window << _snd.window_scale; | |
1041 | ||
1042 | // Segment sequence number used for last window update | |
1043 | _snd.wl1 = th->seq; | |
1044 | // Segment acknowledgment number used for last window update | |
1045 | _snd.wl2 = th->ack; | |
1046 | ||
1047 | // Setup initial congestion window | |
1048 | if (2190 < _snd.mss) { | |
1049 | _snd.cwnd = 2 * _snd.mss; | |
1050 | } else if (1095 < _snd.mss && _snd.mss <= 2190) { | |
1051 | _snd.cwnd = 3 * _snd.mss; | |
1052 | } else { | |
1053 | _snd.cwnd = 4 * _snd.mss; | |
1054 | } | |
1055 | ||
1056 | // Setup initial slow start threshold | |
1057 | _snd.ssthresh = th->window << _snd.window_scale; | |
1058 | } | |
1059 | ||
1060 | template <typename InetTraits> | |
1061 | Packet tcp<InetTraits>::tcb::get_transmit_packet() { | |
1062 | // easy case: empty queue | |
1063 | if (_snd.unsent.empty()) { | |
1064 | return Packet(); | |
1065 | } | |
1066 | auto can_send = this->can_send(); | |
1067 | // Max number of TCP payloads we can pass to NIC | |
1068 | uint32_t len; | |
1069 | if (_tcp.get_hw_features().tx_tso) { | |
1070 | // FIXME: Info tap device the size of the splitted packet | |
1071 | len = _tcp.get_hw_features().max_packet_len - tcp_hdr_len_min - InetTraits::ip_hdr_len_min; | |
1072 | } else { | |
1073 | len = std::min(uint16_t(_tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss); | |
1074 | } | |
1075 | can_send = std::min(can_send, len); | |
1076 | // easy case: one small packet | |
1077 | if (_snd.unsent.front().len() <= can_send) { | |
1078 | auto p = std::move(_snd.unsent.front()); | |
1079 | _snd.unsent.pop_front(); | |
1080 | _snd.unsent_len -= p.len(); | |
1081 | return p; | |
1082 | } | |
1083 | // moderate case: need to split one packet | |
1084 | if (_snd.unsent.front().len() > can_send) { | |
1085 | auto p = _snd.unsent.front().share(0, can_send); | |
1086 | _snd.unsent.front().trim_front(can_send); | |
1087 | _snd.unsent_len -= p.len(); | |
1088 | return p; | |
1089 | } | |
1090 | // hard case: merge some packets, possibly split last | |
1091 | auto p = std::move(_snd.unsent.front()); | |
1092 | _snd.unsent.pop_front(); | |
1093 | can_send -= p.len(); | |
1094 | while (!_snd.unsent.empty() | |
1095 | && _snd.unsent.front().len() <= can_send) { | |
1096 | can_send -= _snd.unsent.front().len(); | |
1097 | p.append(std::move(_snd.unsent.front())); | |
1098 | _snd.unsent.pop_front(); | |
1099 | } | |
1100 | // FIXME: this will result in calling "deleter" of packet which free managed objects | |
1101 | // will used later | |
1102 | // if (!_snd.unsent.empty() && can_send) { | |
1103 | // auto& q = _snd.unsent.front(); | |
1104 | // p.append(q.share(0, can_send)); | |
1105 | // q.trim_front(can_send); | |
1106 | // } | |
1107 | _snd.unsent_len -= p.len(); | |
1108 | return p; | |
1109 | } | |
1110 | ||
1111 | template <typename InetTraits> | |
1112 | void tcp<InetTraits>::tcb::output_one(bool data_retransmit) { | |
1113 | if (in_state(CLOSED)) { | |
1114 | return; | |
1115 | } | |
1116 | ||
1117 | Packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet(); | |
1118 | Packet clone = p.share(); // early clone to prevent share() from calling packet::unuse_internal_data() on header. | |
1119 | uint16_t len = p.len(); | |
1120 | bool syn_on = syn_needs_on(); | |
1121 | bool ack_on = ack_needs_on(); | |
1122 | ||
1123 | auto options_size = _option.get_size(syn_on, ack_on); | |
1124 | auto th = p.prepend_header<tcp_hdr>(options_size); | |
1125 | ||
1126 | th->src_port = _local_port; | |
1127 | th->dst_port = _foreign_port; | |
1128 | ||
1129 | th->f_syn = syn_on; | |
1130 | th->f_ack = ack_on; | |
1131 | if (ack_on) { | |
1132 | clear_delayed_ack(); | |
1133 | } | |
1134 | th->f_urg = false; | |
1135 | th->f_psh = false; | |
1136 | ||
1137 | tcp_sequence seq; | |
1138 | if (data_retransmit) { | |
1139 | seq = _snd.unacknowledged; | |
1140 | } else { | |
1141 | seq = syn_on ? _snd.initial : _snd.next; | |
1142 | _snd.next += len; | |
1143 | } | |
1144 | th->seq = seq; | |
1145 | th->ack = _rcv.next; | |
1146 | th->data_offset = (sizeof(*th) + options_size) / 4; | |
1147 | th->window = _rcv.window >> _rcv.window_scale; | |
1148 | th->checksum = 0; | |
1149 | ||
1150 | // FIXME: does the FIN have to fit in the window? | |
1151 | bool fin_on = fin_needs_on(); | |
1152 | th->f_fin = fin_on; | |
1153 | ||
1154 | // Add tcp options | |
1155 | _option.fill(th, options_size); | |
1156 | *th = th->hton(); | |
1157 | ||
1158 | offload_info oi; | |
1159 | checksummer csum; | |
1160 | uint16_t pseudo_hdr_seg_len = 0; | |
1161 | ||
1162 | oi.tcp_hdr_len = sizeof(tcp_hdr) + options_size; | |
1163 | ||
1164 | if (_tcp.get_hw_features().tx_csum_l4_offload) { | |
1165 | oi.needs_csum = true; | |
1166 | ||
1167 | // | |
1168 | // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's | |
1169 | // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones' | |
1170 | // complement sum of the pseudo header. | |
1171 | // | |
1172 | // For TSO the csum should be calculated for a pseudo header with | |
1173 | // segment length set to 0. All the rest is the same as for a TCP Tx | |
1174 | // CSUM offload case. | |
1175 | // | |
1176 | if (_tcp.get_hw_features().tx_tso && len > _snd.mss) { | |
1177 | oi.tso_seg_size = _snd.mss; | |
1178 | } else { | |
1179 | pseudo_hdr_seg_len = sizeof(*th) + options_size + len; | |
1180 | } | |
1181 | } else { | |
1182 | pseudo_hdr_seg_len = sizeof(*th) + options_size + len; | |
1183 | oi.needs_csum = false; | |
1184 | } | |
1185 | ||
1186 | InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip, | |
1187 | pseudo_hdr_seg_len); | |
1188 | ||
1189 | if (_tcp.get_hw_features().tx_csum_l4_offload) { | |
1190 | th->checksum = ~csum.get(); | |
1191 | } else { | |
1192 | csum.sum(p); | |
1193 | th->checksum = csum.get(); | |
1194 | } | |
1195 | ||
1196 | oi.protocol = ip_protocol_num::tcp; | |
1197 | ||
1198 | p.set_offload_info(oi); | |
1199 | ||
1200 | if (!data_retransmit && (len || syn_on || fin_on)) { | |
1201 | auto now = clock_type::now(); | |
1202 | if (len) { | |
1203 | unsigned nr_transmits = 0; | |
1204 | _snd.data.emplace_back(unacked_segment{std::move(clone), | |
1205 | len, nr_transmits, now}); | |
1206 | } | |
1207 | if (!retransmit_fd) { | |
1208 | start_retransmit_timer(); | |
1209 | } | |
1210 | } | |
1211 | ||
1212 | queue_packet(std::move(p)); | |
1213 | } | |
1214 | ||
1215 | template <typename InetTraits> | |
1216 | bool tcp<InetTraits>::tcb::is_all_data_acked() { | |
1217 | if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) { | |
1218 | return true; | |
1219 | } | |
1220 | return false; | |
1221 | } | |
1222 | ||
1223 | template <typename InetTraits> | |
1224 | Tub<Packet> tcp<InetTraits>::tcb::read() { | |
1225 | Tub<Packet> p; | |
1226 | if (_rcv.data.empty()) | |
1227 | return p; | |
1228 | ||
1229 | p.construct(); | |
1230 | for (auto&& q : _rcv.data) { | |
1231 | p->append(std::move(q)); | |
1232 | } | |
1233 | _rcv.data.clear(); | |
1234 | return p; | |
1235 | } | |
1236 | ||
1237 | template <typename InetTraits> | |
1238 | int tcp<InetTraits>::tcb::send(Packet p) { | |
1239 | // We can not send after the connection is closed | |
1240 | assert(!_snd.closed); | |
1241 | ||
1242 | if (in_state(CLOSED)) | |
1243 | return -ECONNRESET; | |
1244 | ||
1245 | auto len = p.len(); | |
1246 | if (!_snd.user_queue_space.get_or_fail(len)) { | |
1247 | // note: caller must ensure enough queue space to send | |
1248 | ceph_abort(); | |
1249 | } | |
1250 | // TODO: Handle p.len() > max user_queue_space case | |
1251 | _snd.queued_len += len; | |
1252 | _snd.unsent_len += len; | |
1253 | _snd.queued_len -= len; | |
1254 | _snd.unsent.push_back(std::move(p)); | |
1255 | if (can_send() > 0) { | |
1256 | output(); | |
1257 | } | |
1258 | return len; | |
1259 | } | |
1260 | ||
1261 | template <typename InetTraits> | |
1262 | void tcp<InetTraits>::tcb::close() { | |
1263 | if (in_state(CLOSED) || _snd.closed) { | |
1264 | return ; | |
1265 | } | |
1266 | // TODO: We should make this asynchronous | |
1267 | ||
1268 | _errno = -EPIPE; | |
1269 | center->delete_file_event(fd, EVENT_READABLE|EVENT_WRITABLE); | |
1270 | bool acked = is_all_data_acked(); | |
1271 | if (!acked) { | |
1272 | _snd._all_data_acked_fd = manager.get_eventfd(); | |
1273 | center->create_file_event(_snd._all_data_acked_fd, EVENT_READABLE, all_data_ack_event); | |
1274 | } else { | |
1275 | close_final_cleanup(); | |
1276 | } | |
1277 | } | |
1278 | ||
1279 | template <typename InetTraits> | |
1280 | bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) { | |
1281 | // We've received a TSO packet, do ack immediately | |
1282 | if (seg_len > _rcv.mss) { | |
1283 | _nr_full_seg_received = 0; | |
1284 | if (_delayed_ack_fd) { | |
1285 | center->delete_time_event(*_delayed_ack_fd); | |
1286 | _delayed_ack_fd.destroy(); | |
1287 | } | |
1288 | return true; | |
1289 | } | |
1290 | ||
1291 | // We've received a full sized segment, ack for every second full sized segment | |
1292 | if (seg_len == _rcv.mss) { | |
1293 | if (_nr_full_seg_received++ >= 1) { | |
1294 | _nr_full_seg_received = 0; | |
1295 | if (_delayed_ack_fd) { | |
1296 | center->delete_time_event(*_delayed_ack_fd); | |
1297 | _delayed_ack_fd.destroy(); | |
1298 | } | |
1299 | return true; | |
1300 | } | |
1301 | } | |
1302 | ||
1303 | // If the timer is armed and its callback hasn't been run. | |
1304 | if (_delayed_ack_fd) { | |
1305 | return false; | |
1306 | } | |
1307 | ||
1308 | // If the timer is not armed, schedule a delayed ACK. | |
1309 | // The maximum delayed ack timer allowed by RFC1122 is 500ms, most | |
1310 | // implementations use 200ms. | |
1311 | _delayed_ack_fd.construct(center->create_time_event(200*1000, delayed_ack_event)); | |
1312 | return false; | |
1313 | } | |
1314 | ||
1315 | template <typename InetTraits> | |
1316 | void tcp<InetTraits>::tcb::clear_delayed_ack() { | |
1317 | if (_delayed_ack_fd) { | |
1318 | center->delete_time_event(*_delayed_ack_fd); | |
1319 | _delayed_ack_fd.destroy(); | |
1320 | } | |
1321 | } | |
1322 | ||
1323 | template <typename InetTraits> | |
1324 | bool tcp<InetTraits>::tcb::merge_out_of_order() { | |
1325 | bool merged = false; | |
1326 | if (_rcv.out_of_order.map.empty()) { | |
1327 | return merged; | |
1328 | } | |
1329 | for (auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) { | |
1330 | auto& p = it->second; | |
1331 | auto seg_beg = it->first; | |
1332 | auto seg_len = p.len(); | |
1333 | auto seg_end = seg_beg + seg_len; | |
1334 | if (seg_beg <= _rcv.next && seg_end > _rcv.next) { | |
1335 | // This segment has been received out of order and its previous | |
1336 | // segment has been received now | |
1337 | auto trim = _rcv.next - seg_beg; | |
1338 | if (trim) { | |
1339 | p.trim_front(trim); | |
1340 | seg_len -= trim; | |
1341 | } | |
1342 | _rcv.next += seg_len; | |
1343 | _rcv.data.push_back(std::move(p)); | |
1344 | // Since c++11, erase() always returns the value of the following element | |
1345 | it = _rcv.out_of_order.map.erase(it); | |
1346 | merged = true; | |
1347 | } else if (_rcv.next >= seg_end) { | |
1348 | // This segment has been receive already, drop it | |
1349 | it = _rcv.out_of_order.map.erase(it); | |
1350 | } else { | |
1351 | // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only, | |
1352 | // so we can stop looking here. | |
1353 | it++; | |
1354 | break; | |
1355 | } | |
1356 | } | |
1357 | return merged; | |
1358 | } | |
1359 | ||
1360 | template <typename InetTraits> | |
1361 | void tcp<InetTraits>::tcb::insert_out_of_order(tcp_sequence seg, Packet p) { | |
1362 | _rcv.out_of_order.merge(seg, std::move(p)); | |
1363 | } | |
1364 | ||
1365 | template <typename InetTraits> | |
1366 | void tcp<InetTraits>::tcb::trim_receive_data_after_window() { | |
1367 | abort(); | |
1368 | } | |
1369 | ||
1370 | template <typename InetTraits> | |
1371 | void tcp<InetTraits>::tcb::fast_retransmit() { | |
1372 | if (!_snd.data.empty()) { | |
1373 | auto& unacked_seg = _snd.data.front(); | |
1374 | unacked_seg.nr_transmits++; | |
1375 | retransmit_one(); | |
1376 | output(); | |
1377 | } | |
1378 | } | |
1379 | ||
1380 | template <typename InetTraits> | |
1381 | void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) { | |
1382 | // Update RTO according to RFC6298 | |
1383 | auto R = std::chrono::duration_cast<std::chrono::microseconds>(clock_type::now() - tx_time); | |
1384 | if (_snd.first_rto_sample) { | |
1385 | _snd.first_rto_sample = false; | |
1386 | // RTTVAR <- R/2 | |
1387 | // SRTT <- R | |
1388 | _snd.rttvar = R / 2; | |
1389 | _snd.srtt = R; | |
1390 | } else { | |
1391 | // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'| | |
1392 | // SRTT <- (1 - alpha) * SRTT + alpha * R' | |
1393 | // where alpha = 1/8 and beta = 1/4 | |
1394 | auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt); | |
1395 | _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4; | |
1396 | _snd.srtt = _snd.srtt * 7 / 8 + R / 8; | |
1397 | } | |
1398 | // RTO <- SRTT + max(G, K * RTTVAR) | |
1399 | _rto = _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar); | |
1400 | ||
1401 | // Make sure 1 sec << _rto << 60 sec | |
1402 | _rto = std::max(_rto, _rto_min); | |
1403 | _rto = std::min(_rto, _rto_max); | |
1404 | } | |
1405 | ||
1406 | template <typename InetTraits> | |
1407 | void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) { | |
1408 | uint32_t smss = _snd.mss; | |
1409 | if (_snd.cwnd < _snd.ssthresh) { | |
1410 | // In slow start phase | |
1411 | _snd.cwnd += std::min(acked_bytes, smss); | |
1412 | } else { | |
1413 | // In congestion avoidance phase | |
1414 | uint32_t round_up = 1; | |
1415 | _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd); | |
1416 | } | |
1417 | } | |
1418 | ||
1419 | ||
1420 | template <typename InetTraits> | |
1421 | void tcp<InetTraits>::tcb::cleanup() { | |
1422 | manager.notify(fd, EVENT_READABLE); | |
1423 | _snd.closed = true; | |
1424 | _snd.unsent.clear(); | |
1425 | _snd.data.clear(); | |
1426 | _rcv.out_of_order.map.clear(); | |
1427 | _rcv.data.clear(); | |
1428 | stop_retransmit_timer(); | |
1429 | clear_delayed_ack(); | |
1430 | center->dispatch_event_external(new tcp<InetTraits>::C_actual_remove_tcb(this)); | |
1431 | remove_from_tcbs(); | |
1432 | } | |
1433 | ||
1434 | template <typename InetTraits> | |
1435 | tcp_sequence tcp<InetTraits>::tcb::get_isn() { | |
1436 | // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers | |
1437 | // with the expression: | |
1438 | // ISN = M + F(localip, localport, remoteip, remoteport, secretkey) | |
1439 | // M is the 4 microsecond timer | |
1440 | using namespace std::chrono; | |
1441 | uint32_t hash[4]; | |
1442 | hash[0] = _local_ip.ip; | |
1443 | hash[1] = _foreign_ip.ip; | |
1444 | hash[2] = (_local_port << 16) + _foreign_port; | |
1445 | hash[3] = _isn_secret.key[15]; | |
1446 | CryptoPP::Weak::MD5::Transform(hash, _isn_secret.key); | |
1447 | auto seq = hash[0]; | |
1448 | auto m = duration_cast<microseconds>(clock_type::now().time_since_epoch()); | |
1449 | seq += m.count() / 4; | |
1450 | return make_seq(seq); | |
1451 | } | |
1452 | ||
1453 | template <typename InetTraits> | |
1454 | Tub<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() { | |
1455 | _poll_active = false; | |
1456 | if (_packetq.empty()) { | |
1457 | output_one(); | |
1458 | } | |
1459 | ||
1460 | Tub<typename InetTraits::l4packet> p; | |
1461 | if (in_state(CLOSED)) { | |
1462 | return p; | |
1463 | } | |
1464 | ||
1465 | assert(!_packetq.empty()); | |
1466 | ||
1467 | p = std::move(_packetq.front()); | |
1468 | _packetq.pop_front(); | |
1469 | if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0)) { | |
1470 | // If there are packets to send in the queue or tcb is allowed to send | |
1471 | // more add tcp back to polling set to keep sending. In addition, dupacks >= 3 | |
1472 | // is an indication that an segment is lost, stop sending more in this case. | |
1473 | output(); | |
1474 | } | |
1475 | return p; | |
1476 | } | |
1477 | ||
1478 | template <typename InetTraits> | |
1479 | void tcp<InetTraits>::connection::close_read() { | |
1480 | // do nothing | |
1481 | // _tcb->manager.notify(_tcb->fd, EVENT_READABLE); | |
1482 | } | |
1483 | ||
1484 | template <typename InetTraits> | |
1485 | void tcp<InetTraits>::connection::close_write() { | |
1486 | _tcb->close(); | |
1487 | } | |
1488 | ||
1489 | template <typename InetTraits> | |
1490 | constexpr uint16_t tcp<InetTraits>::tcb::_max_nr_retransmit; | |
1491 | ||
1492 | template <typename InetTraits> | |
1493 | constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_min; | |
1494 | ||
1495 | template <typename InetTraits> | |
1496 | constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_max; | |
1497 | ||
1498 | template <typename InetTraits> | |
1499 | constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_clk_granularity; | |
1500 | ||
1501 | template <typename InetTraits> | |
1502 | typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret; | |
1503 | ||
1504 | ||
1505 | #endif /* TCP_HH_ */ |