]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
9f95a23c | 6 | #include <seastar/core/gate.hh> |
11fdf7f2 | 7 | #include <seastar/core/reactor.hh> |
9f95a23c | 8 | #include <seastar/core/sharded.hh> |
11fdf7f2 TL |
9 | #include <seastar/net/packet.hh> |
10 | ||
11 | #include "include/buffer.h" | |
12 | ||
9f95a23c TL |
13 | #include "crimson/common/log.h" |
14 | #include "Errors.h" | |
f67539c2 | 15 | #include "Fwd.h" |
9f95a23c TL |
16 | |
17 | #ifdef UNIT_TESTS_BUILT | |
18 | #include "Interceptor.h" | |
19 | #endif | |
20 | ||
21 | namespace crimson::net { | |
22 | ||
23 | class Socket; | |
24 | using SocketRef = std::unique_ptr<Socket>; | |
11fdf7f2 TL |
25 | |
26 | class Socket | |
27 | { | |
9f95a23c TL |
28 | struct construct_tag {}; |
29 | ||
11fdf7f2 | 30 | public: |
f67539c2 TL |
31 | // if acceptor side, peer is using a different port (ephemeral_port) |
32 | // if connector side, I'm using a different port (ephemeral_port) | |
33 | enum class side_t { | |
34 | acceptor, | |
35 | connector | |
36 | }; | |
37 | ||
38 | Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag) | |
39 | : sid{seastar::this_shard_id()}, | |
11fdf7f2 TL |
40 | socket(std::move(_socket)), |
41 | in(socket.input()), | |
9f95a23c TL |
42 | // the default buffer size 8192 is too small that may impact our write |
43 | // performance. see seastar::net::connected_socket::output() | |
f67539c2 | 44 | out(socket.output(65536)), |
1e59de90 | 45 | socket_is_shutdown(false), |
f67539c2 TL |
46 | side(_side), |
47 | ephemeral_port(e_port) {} | |
9f95a23c TL |
48 | |
49 | ~Socket() { | |
50 | #ifndef NDEBUG | |
51 | assert(closed); | |
52 | #endif | |
53 | } | |
54 | ||
11fdf7f2 TL |
55 | Socket(Socket&& o) = delete; |
56 | ||
9f95a23c TL |
57 | static seastar::future<SocketRef> |
58 | connect(const entity_addr_t& peer_addr) { | |
1e59de90 TL |
59 | inject_failure(); |
60 | return inject_delay( | |
61 | ).then([peer_addr] { | |
62 | return seastar::connect(peer_addr.in4_addr()); | |
63 | }).then([] (seastar::connected_socket socket) { | |
f67539c2 TL |
64 | return std::make_unique<Socket>( |
65 | std::move(socket), side_t::connector, 0, construct_tag{}); | |
9f95a23c TL |
66 | }); |
67 | } | |
68 | ||
11fdf7f2 TL |
69 | /// read the requested number of bytes into a bufferlist |
70 | seastar::future<bufferlist> read(size_t bytes); | |
71 | using tmp_buf = seastar::temporary_buffer<char>; | |
72 | using packet = seastar::net::packet; | |
73 | seastar::future<tmp_buf> read_exactly(size_t bytes); | |
74 | ||
75 | seastar::future<> write(packet&& buf) { | |
9f95a23c | 76 | #ifdef UNIT_TESTS_BUILT |
1e59de90 TL |
77 | return try_trap_pre(next_trap_write |
78 | ).then([buf = std::move(buf), this] () mutable { | |
9f95a23c | 79 | #endif |
1e59de90 TL |
80 | inject_failure(); |
81 | return inject_delay( | |
82 | ).then([buf = std::move(buf), this] () mutable { | |
83 | return out.write(std::move(buf)); | |
84 | }); | |
9f95a23c TL |
85 | #ifdef UNIT_TESTS_BUILT |
86 | }).then([this] { | |
87 | return try_trap_post(next_trap_write); | |
88 | }); | |
89 | #endif | |
11fdf7f2 TL |
90 | } |
91 | seastar::future<> flush() { | |
1e59de90 TL |
92 | inject_failure(); |
93 | return inject_delay().then([this] { | |
94 | return out.flush(); | |
95 | }); | |
11fdf7f2 TL |
96 | } |
97 | seastar::future<> write_flush(packet&& buf) { | |
9f95a23c TL |
98 | #ifdef UNIT_TESTS_BUILT |
99 | return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { | |
100 | #endif | |
1e59de90 TL |
101 | inject_failure(); |
102 | return inject_delay( | |
103 | ).then([buf = std::move(buf), this] () mutable { | |
104 | return out.write(std::move(buf)).then([this] { return out.flush(); }); | |
105 | }); | |
9f95a23c TL |
106 | #ifdef UNIT_TESTS_BUILT |
107 | }).then([this] { | |
108 | return try_trap_post(next_trap_write); | |
109 | }); | |
110 | #endif | |
11fdf7f2 TL |
111 | } |
112 | ||
1e59de90 TL |
113 | bool is_shutdown() const { |
114 | return socket_is_shutdown; | |
115 | } | |
116 | ||
9f95a23c TL |
117 | // preemptively disable further reads or writes, can only be shutdown once. |
118 | void shutdown(); | |
119 | ||
11fdf7f2 | 120 | /// Socket can only be closed once. |
9f95a23c TL |
121 | seastar::future<> close(); |
122 | ||
1e59de90 TL |
123 | static seastar::future<> inject_delay(); |
124 | ||
125 | static void inject_failure(); | |
126 | ||
127 | // shutdown for tests | |
128 | void force_shutdown() { | |
129 | socket.shutdown_input(); | |
130 | socket.shutdown_output(); | |
131 | } | |
132 | ||
9f95a23c TL |
133 | // shutdown input_stream only, for tests |
134 | void force_shutdown_in() { | |
135 | socket.shutdown_input(); | |
136 | } | |
137 | ||
138 | // shutdown output_stream only, for tests | |
139 | void force_shutdown_out() { | |
140 | socket.shutdown_output(); | |
141 | } | |
142 | ||
f67539c2 TL |
143 | side_t get_side() const { |
144 | return side; | |
145 | } | |
146 | ||
147 | uint16_t get_ephemeral_port() const { | |
148 | return ephemeral_port; | |
149 | } | |
150 | ||
151 | // learn my ephemeral_port as connector. | |
152 | // unfortunately, there's no way to identify which port I'm using as | |
153 | // connector with current seastar interface. | |
154 | void learn_ephemeral_port_as_connector(uint16_t port) { | |
155 | assert(side == side_t::connector && | |
156 | (ephemeral_port == 0 || ephemeral_port == port)); | |
157 | ephemeral_port = port; | |
158 | } | |
159 | ||
20effc67 TL |
160 | seastar::socket_address get_local_address() const { |
161 | return socket.local_address(); | |
162 | } | |
163 | ||
f67539c2 TL |
164 | private: |
165 | const seastar::shard_id sid; | |
166 | seastar::connected_socket socket; | |
167 | seastar::input_stream<char> in; | |
168 | seastar::output_stream<char> out; | |
1e59de90 | 169 | bool socket_is_shutdown; |
f67539c2 TL |
170 | side_t side; |
171 | uint16_t ephemeral_port; | |
172 | ||
173 | #ifndef NDEBUG | |
174 | bool closed = false; | |
175 | #endif | |
176 | ||
177 | /// buffer state for read() | |
178 | struct { | |
179 | bufferlist buffer; | |
180 | size_t remaining; | |
181 | } r; | |
182 | ||
9f95a23c | 183 | #ifdef UNIT_TESTS_BUILT |
f67539c2 TL |
184 | public: |
185 | void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); | |
186 | ||
9f95a23c TL |
187 | private: |
188 | bp_action_t next_trap_read = bp_action_t::CONTINUE; | |
189 | bp_action_t next_trap_write = bp_action_t::CONTINUE; | |
190 | socket_blocker* blocker = nullptr; | |
191 | seastar::future<> try_trap_pre(bp_action_t& trap); | |
192 | seastar::future<> try_trap_post(bp_action_t& trap); | |
193 | ||
9f95a23c TL |
194 | #endif |
195 | friend class FixedCPUServerSocket; | |
196 | }; | |
197 | ||
20effc67 TL |
198 | using listen_ertr = crimson::errorator< |
199 | crimson::ct_error::address_in_use, // The address is already bound | |
200 | crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/ | |
201 | >; | |
202 | ||
9f95a23c TL |
203 | class FixedCPUServerSocket |
204 | : public seastar::peering_sharded_service<FixedCPUServerSocket> { | |
205 | const seastar::shard_id cpu; | |
206 | entity_addr_t addr; | |
207 | std::optional<seastar::server_socket> listener; | |
208 | seastar::gate shutdown_gate; | |
209 | ||
210 | using sharded_service_t = seastar::sharded<FixedCPUServerSocket>; | |
211 | std::unique_ptr<sharded_service_t> service; | |
212 | ||
213 | struct construct_tag {}; | |
214 | ||
215 | static seastar::logger& logger() { | |
216 | return crimson::get_logger(ceph_subsys_ms); | |
217 | } | |
218 | ||
219 | seastar::future<> reset() { | |
220 | return container().invoke_on_all([] (auto& ss) { | |
221 | assert(ss.shutdown_gate.is_closed()); | |
9f95a23c TL |
222 | ss.addr = entity_addr_t(); |
223 | ss.listener.reset(); | |
224 | }); | |
225 | } | |
226 | ||
227 | public: | |
228 | FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {} | |
229 | ~FixedCPUServerSocket() { | |
230 | assert(!listener); | |
231 | // detect whether user have called destroy() properly | |
232 | ceph_assert(!service); | |
233 | } | |
234 | ||
235 | FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; | |
236 | FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; | |
237 | FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; | |
238 | ||
f67539c2 | 239 | listen_ertr::future<> listen(entity_addr_t addr); |
9f95a23c TL |
240 | |
241 | // fn_accept should be a nothrow function of type | |
242 | // seastar::future<>(SocketRef, entity_addr_t) | |
243 | template <typename Func> | |
244 | seastar::future<> accept(Func&& fn_accept) { | |
f67539c2 | 245 | assert(seastar::this_shard_id() == cpu); |
9f95a23c TL |
246 | logger().trace("FixedCPUServerSocket({})::accept()...", addr); |
247 | return container().invoke_on_all( | |
248 | [fn_accept = std::move(fn_accept)] (auto& ss) mutable { | |
249 | assert(ss.listener); | |
250 | // gate accepting | |
251 | // FixedCPUServerSocket::shutdown() will drain the continuations in the gate | |
252 | // so ignore the returned future | |
253 | std::ignore = seastar::with_gate(ss.shutdown_gate, | |
254 | [&ss, fn_accept = std::move(fn_accept)] () mutable { | |
255 | return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable { | |
256 | return ss.listener->accept().then( | |
257 | [&ss, fn_accept = std::move(fn_accept)] | |
258 | (seastar::accept_result accept_result) mutable { | |
259 | // assert seastar::listen_options::set_fixed_cpu() works | |
f67539c2 | 260 | assert(seastar::this_shard_id() == ss.cpu); |
9f95a23c TL |
261 | auto [socket, paddr] = std::move(accept_result); |
262 | entity_addr_t peer_addr; | |
263 | peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); | |
20effc67 | 264 | peer_addr.set_type(ss.addr.get_type()); |
9f95a23c | 265 | SocketRef _socket = std::make_unique<Socket>( |
f67539c2 TL |
266 | std::move(socket), Socket::side_t::acceptor, |
267 | peer_addr.get_port(), Socket::construct_tag{}); | |
9f95a23c TL |
268 | std::ignore = seastar::with_gate(ss.shutdown_gate, |
269 | [socket = std::move(_socket), peer_addr, | |
270 | &ss, fn_accept = std::move(fn_accept)] () mutable { | |
271 | logger().trace("FixedCPUServerSocket({})::accept(): " | |
272 | "accepted peer {}", ss.addr, peer_addr); | |
273 | return fn_accept(std::move(socket), peer_addr | |
274 | ).handle_exception([&ss, peer_addr] (auto eptr) { | |
275 | logger().error("FixedCPUServerSocket({})::accept(): " | |
276 | "fn_accept(s, {}) got unexpected exception {}", | |
277 | ss.addr, peer_addr, eptr); | |
278 | ceph_abort(); | |
279 | }); | |
280 | }); | |
281 | }); | |
282 | }).handle_exception_type([&ss] (const std::system_error& e) { | |
283 | if (e.code() == std::errc::connection_aborted || | |
284 | e.code() == std::errc::invalid_argument) { | |
285 | logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})", | |
286 | ss.addr, e); | |
287 | } else { | |
288 | throw; | |
289 | } | |
290 | }).handle_exception([&ss] (auto eptr) { | |
291 | logger().error("FixedCPUServerSocket({})::accept(): " | |
292 | "got unexpected exception {}", ss.addr, eptr); | |
293 | ceph_abort(); | |
294 | }); | |
295 | }); | |
296 | }); | |
297 | } | |
298 | ||
f67539c2 TL |
299 | seastar::future<> shutdown(); |
300 | seastar::future<> destroy(); | |
301 | static seastar::future<FixedCPUServerSocket*> create(); | |
11fdf7f2 TL |
302 | }; |
303 | ||
9f95a23c | 304 | } // namespace crimson::net |