]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
9f95a23c | 6 | #include <seastar/core/gate.hh> |
11fdf7f2 | 7 | #include <seastar/core/reactor.hh> |
9f95a23c | 8 | #include <seastar/core/sharded.hh> |
11fdf7f2 TL |
9 | #include <seastar/net/packet.hh> |
10 | ||
11 | #include "include/buffer.h" | |
12 | ||
9f95a23c TL |
13 | #include "crimson/common/log.h" |
14 | #include "Errors.h" | |
f67539c2 | 15 | #include "Fwd.h" |
9f95a23c TL |
16 | |
17 | #ifdef UNIT_TESTS_BUILT | |
18 | #include "Interceptor.h" | |
19 | #endif | |
20 | ||
21 | namespace crimson::net { | |
22 | ||
23 | class Socket; | |
24 | using SocketRef = std::unique_ptr<Socket>; | |
11fdf7f2 TL |
25 | |
26 | class Socket | |
27 | { | |
9f95a23c TL |
28 | struct construct_tag {}; |
29 | ||
11fdf7f2 | 30 | public: |
f67539c2 TL |
31 | // if acceptor side, peer is using a different port (ephemeral_port) |
32 | // if connector side, I'm using a different port (ephemeral_port) | |
33 | enum class side_t { | |
34 | acceptor, | |
35 | connector | |
36 | }; | |
37 | ||
38 | Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag) | |
39 | : sid{seastar::this_shard_id()}, | |
11fdf7f2 TL |
40 | socket(std::move(_socket)), |
41 | in(socket.input()), | |
9f95a23c TL |
42 | // the default buffer size 8192 is too small that may impact our write |
43 | // performance. see seastar::net::connected_socket::output() | |
f67539c2 TL |
44 | out(socket.output(65536)), |
45 | side(_side), | |
46 | ephemeral_port(e_port) {} | |
9f95a23c TL |
47 | |
48 | ~Socket() { | |
49 | #ifndef NDEBUG | |
50 | assert(closed); | |
51 | #endif | |
52 | } | |
53 | ||
11fdf7f2 TL |
54 | Socket(Socket&& o) = delete; |
55 | ||
9f95a23c TL |
56 | static seastar::future<SocketRef> |
57 | connect(const entity_addr_t& peer_addr) { | |
58 | return seastar::connect(peer_addr.in4_addr() | |
59 | ).then([] (seastar::connected_socket socket) { | |
f67539c2 TL |
60 | return std::make_unique<Socket>( |
61 | std::move(socket), side_t::connector, 0, construct_tag{}); | |
9f95a23c TL |
62 | }); |
63 | } | |
64 | ||
11fdf7f2 TL |
65 | /// read the requested number of bytes into a bufferlist |
66 | seastar::future<bufferlist> read(size_t bytes); | |
67 | using tmp_buf = seastar::temporary_buffer<char>; | |
68 | using packet = seastar::net::packet; | |
69 | seastar::future<tmp_buf> read_exactly(size_t bytes); | |
70 | ||
71 | seastar::future<> write(packet&& buf) { | |
9f95a23c TL |
72 | #ifdef UNIT_TESTS_BUILT |
73 | return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { | |
74 | #endif | |
75 | return out.write(std::move(buf)); | |
76 | #ifdef UNIT_TESTS_BUILT | |
77 | }).then([this] { | |
78 | return try_trap_post(next_trap_write); | |
79 | }); | |
80 | #endif | |
11fdf7f2 TL |
81 | } |
82 | seastar::future<> flush() { | |
83 | return out.flush(); | |
84 | } | |
85 | seastar::future<> write_flush(packet&& buf) { | |
9f95a23c TL |
86 | #ifdef UNIT_TESTS_BUILT |
87 | return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { | |
88 | #endif | |
89 | return out.write(std::move(buf)).then([this] { return out.flush(); }); | |
90 | #ifdef UNIT_TESTS_BUILT | |
91 | }).then([this] { | |
92 | return try_trap_post(next_trap_write); | |
93 | }); | |
94 | #endif | |
11fdf7f2 TL |
95 | } |
96 | ||
9f95a23c TL |
97 | // preemptively disable further reads or writes, can only be shutdown once. |
98 | void shutdown(); | |
99 | ||
11fdf7f2 | 100 | /// Socket can only be closed once. |
9f95a23c TL |
101 | seastar::future<> close(); |
102 | ||
103 | // shutdown input_stream only, for tests | |
104 | void force_shutdown_in() { | |
105 | socket.shutdown_input(); | |
106 | } | |
107 | ||
108 | // shutdown output_stream only, for tests | |
109 | void force_shutdown_out() { | |
110 | socket.shutdown_output(); | |
111 | } | |
112 | ||
f67539c2 TL |
113 | side_t get_side() const { |
114 | return side; | |
115 | } | |
116 | ||
117 | uint16_t get_ephemeral_port() const { | |
118 | return ephemeral_port; | |
119 | } | |
120 | ||
121 | // learn my ephemeral_port as connector. | |
122 | // unfortunately, there's no way to identify which port I'm using as | |
123 | // connector with current seastar interface. | |
124 | void learn_ephemeral_port_as_connector(uint16_t port) { | |
125 | assert(side == side_t::connector && | |
126 | (ephemeral_port == 0 || ephemeral_port == port)); | |
127 | ephemeral_port = port; | |
128 | } | |
129 | ||
20effc67 TL |
130 | seastar::socket_address get_local_address() const { |
131 | return socket.local_address(); | |
132 | } | |
133 | ||
f67539c2 TL |
134 | private: |
135 | const seastar::shard_id sid; | |
136 | seastar::connected_socket socket; | |
137 | seastar::input_stream<char> in; | |
138 | seastar::output_stream<char> out; | |
139 | side_t side; | |
140 | uint16_t ephemeral_port; | |
141 | ||
142 | #ifndef NDEBUG | |
143 | bool closed = false; | |
144 | #endif | |
145 | ||
146 | /// buffer state for read() | |
147 | struct { | |
148 | bufferlist buffer; | |
149 | size_t remaining; | |
150 | } r; | |
151 | ||
9f95a23c | 152 | #ifdef UNIT_TESTS_BUILT |
f67539c2 TL |
153 | public: |
154 | void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); | |
155 | ||
9f95a23c TL |
156 | private: |
157 | bp_action_t next_trap_read = bp_action_t::CONTINUE; | |
158 | bp_action_t next_trap_write = bp_action_t::CONTINUE; | |
159 | socket_blocker* blocker = nullptr; | |
160 | seastar::future<> try_trap_pre(bp_action_t& trap); | |
161 | seastar::future<> try_trap_post(bp_action_t& trap); | |
162 | ||
9f95a23c TL |
163 | #endif |
164 | friend class FixedCPUServerSocket; | |
165 | }; | |
166 | ||
20effc67 TL |
167 | using listen_ertr = crimson::errorator< |
168 | crimson::ct_error::address_in_use, // The address is already bound | |
169 | crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/ | |
170 | >; | |
171 | ||
9f95a23c TL |
172 | class FixedCPUServerSocket |
173 | : public seastar::peering_sharded_service<FixedCPUServerSocket> { | |
174 | const seastar::shard_id cpu; | |
175 | entity_addr_t addr; | |
176 | std::optional<seastar::server_socket> listener; | |
177 | seastar::gate shutdown_gate; | |
178 | ||
179 | using sharded_service_t = seastar::sharded<FixedCPUServerSocket>; | |
180 | std::unique_ptr<sharded_service_t> service; | |
181 | ||
182 | struct construct_tag {}; | |
183 | ||
184 | static seastar::logger& logger() { | |
185 | return crimson::get_logger(ceph_subsys_ms); | |
186 | } | |
187 | ||
188 | seastar::future<> reset() { | |
189 | return container().invoke_on_all([] (auto& ss) { | |
190 | assert(ss.shutdown_gate.is_closed()); | |
9f95a23c TL |
191 | ss.addr = entity_addr_t(); |
192 | ss.listener.reset(); | |
193 | }); | |
194 | } | |
195 | ||
196 | public: | |
197 | FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {} | |
198 | ~FixedCPUServerSocket() { | |
199 | assert(!listener); | |
200 | // detect whether user have called destroy() properly | |
201 | ceph_assert(!service); | |
202 | } | |
203 | ||
204 | FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; | |
205 | FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; | |
206 | FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; | |
207 | ||
f67539c2 | 208 | listen_ertr::future<> listen(entity_addr_t addr); |
9f95a23c TL |
209 | |
210 | // fn_accept should be a nothrow function of type | |
211 | // seastar::future<>(SocketRef, entity_addr_t) | |
212 | template <typename Func> | |
213 | seastar::future<> accept(Func&& fn_accept) { | |
f67539c2 | 214 | assert(seastar::this_shard_id() == cpu); |
9f95a23c TL |
215 | logger().trace("FixedCPUServerSocket({})::accept()...", addr); |
216 | return container().invoke_on_all( | |
217 | [fn_accept = std::move(fn_accept)] (auto& ss) mutable { | |
218 | assert(ss.listener); | |
219 | // gate accepting | |
220 | // FixedCPUServerSocket::shutdown() will drain the continuations in the gate | |
221 | // so ignore the returned future | |
222 | std::ignore = seastar::with_gate(ss.shutdown_gate, | |
223 | [&ss, fn_accept = std::move(fn_accept)] () mutable { | |
224 | return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable { | |
225 | return ss.listener->accept().then( | |
226 | [&ss, fn_accept = std::move(fn_accept)] | |
227 | (seastar::accept_result accept_result) mutable { | |
228 | // assert seastar::listen_options::set_fixed_cpu() works | |
f67539c2 | 229 | assert(seastar::this_shard_id() == ss.cpu); |
9f95a23c TL |
230 | auto [socket, paddr] = std::move(accept_result); |
231 | entity_addr_t peer_addr; | |
232 | peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); | |
20effc67 | 233 | peer_addr.set_type(ss.addr.get_type()); |
9f95a23c | 234 | SocketRef _socket = std::make_unique<Socket>( |
f67539c2 TL |
235 | std::move(socket), Socket::side_t::acceptor, |
236 | peer_addr.get_port(), Socket::construct_tag{}); | |
9f95a23c TL |
237 | std::ignore = seastar::with_gate(ss.shutdown_gate, |
238 | [socket = std::move(_socket), peer_addr, | |
239 | &ss, fn_accept = std::move(fn_accept)] () mutable { | |
240 | logger().trace("FixedCPUServerSocket({})::accept(): " | |
241 | "accepted peer {}", ss.addr, peer_addr); | |
242 | return fn_accept(std::move(socket), peer_addr | |
243 | ).handle_exception([&ss, peer_addr] (auto eptr) { | |
244 | logger().error("FixedCPUServerSocket({})::accept(): " | |
245 | "fn_accept(s, {}) got unexpected exception {}", | |
246 | ss.addr, peer_addr, eptr); | |
247 | ceph_abort(); | |
248 | }); | |
249 | }); | |
250 | }); | |
251 | }).handle_exception_type([&ss] (const std::system_error& e) { | |
252 | if (e.code() == std::errc::connection_aborted || | |
253 | e.code() == std::errc::invalid_argument) { | |
254 | logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})", | |
255 | ss.addr, e); | |
256 | } else { | |
257 | throw; | |
258 | } | |
259 | }).handle_exception([&ss] (auto eptr) { | |
260 | logger().error("FixedCPUServerSocket({})::accept(): " | |
261 | "got unexpected exception {}", ss.addr, eptr); | |
262 | ceph_abort(); | |
263 | }); | |
264 | }); | |
265 | }); | |
266 | } | |
267 | ||
f67539c2 TL |
268 | seastar::future<> shutdown(); |
269 | seastar::future<> destroy(); | |
270 | static seastar::future<FixedCPUServerSocket*> create(); | |
11fdf7f2 TL |
271 | }; |
272 | ||
9f95a23c | 273 | } // namespace crimson::net |