]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/Socket.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / net / Socket.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9 #include <seastar/net/packet.hh>
10
11 #include "include/buffer.h"
12
13 #include "crimson/common/log.h"
14 #include "Errors.h"
15 #include "Fwd.h"
16
17 #ifdef UNIT_TESTS_BUILT
18 #include "Interceptor.h"
19 #endif
20
21 namespace crimson::net {
22
23 class Socket;
24 using SocketRef = std::unique_ptr<Socket>;
25
26 class Socket
27 {
28 struct construct_tag {};
29
30 public:
31 // if acceptor side, peer is using a different port (ephemeral_port)
32 // if connector side, I'm using a different port (ephemeral_port)
33 enum class side_t {
34 acceptor,
35 connector
36 };
37
38 Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
39 : sid{seastar::this_shard_id()},
40 socket(std::move(_socket)),
41 in(socket.input()),
42 // the default buffer size 8192 is too small that may impact our write
43 // performance. see seastar::net::connected_socket::output()
44 out(socket.output(65536)),
45 socket_is_shutdown(false),
46 side(_side),
47 ephemeral_port(e_port) {}
48
49 ~Socket() {
50 #ifndef NDEBUG
51 assert(closed);
52 #endif
53 }
54
55 Socket(Socket&& o) = delete;
56
57 static seastar::future<SocketRef>
58 connect(const entity_addr_t& peer_addr) {
59 inject_failure();
60 return inject_delay(
61 ).then([peer_addr] {
62 return seastar::connect(peer_addr.in4_addr());
63 }).then([] (seastar::connected_socket socket) {
64 return std::make_unique<Socket>(
65 std::move(socket), side_t::connector, 0, construct_tag{});
66 });
67 }
68
69 /// read the requested number of bytes into a bufferlist
70 seastar::future<bufferlist> read(size_t bytes);
71 using tmp_buf = seastar::temporary_buffer<char>;
72 using packet = seastar::net::packet;
73 seastar::future<tmp_buf> read_exactly(size_t bytes);
74
75 seastar::future<> write(packet&& buf) {
76 #ifdef UNIT_TESTS_BUILT
77 return try_trap_pre(next_trap_write
78 ).then([buf = std::move(buf), this] () mutable {
79 #endif
80 inject_failure();
81 return inject_delay(
82 ).then([buf = std::move(buf), this] () mutable {
83 return out.write(std::move(buf));
84 });
85 #ifdef UNIT_TESTS_BUILT
86 }).then([this] {
87 return try_trap_post(next_trap_write);
88 });
89 #endif
90 }
91 seastar::future<> flush() {
92 inject_failure();
93 return inject_delay().then([this] {
94 return out.flush();
95 });
96 }
97 seastar::future<> write_flush(packet&& buf) {
98 #ifdef UNIT_TESTS_BUILT
99 return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
100 #endif
101 inject_failure();
102 return inject_delay(
103 ).then([buf = std::move(buf), this] () mutable {
104 return out.write(std::move(buf)).then([this] { return out.flush(); });
105 });
106 #ifdef UNIT_TESTS_BUILT
107 }).then([this] {
108 return try_trap_post(next_trap_write);
109 });
110 #endif
111 }
112
113 bool is_shutdown() const {
114 return socket_is_shutdown;
115 }
116
117 // preemptively disable further reads or writes, can only be shutdown once.
118 void shutdown();
119
120 /// Socket can only be closed once.
121 seastar::future<> close();
122
123 static seastar::future<> inject_delay();
124
125 static void inject_failure();
126
127 // shutdown for tests
128 void force_shutdown() {
129 socket.shutdown_input();
130 socket.shutdown_output();
131 }
132
133 // shutdown input_stream only, for tests
134 void force_shutdown_in() {
135 socket.shutdown_input();
136 }
137
138 // shutdown output_stream only, for tests
139 void force_shutdown_out() {
140 socket.shutdown_output();
141 }
142
143 side_t get_side() const {
144 return side;
145 }
146
147 uint16_t get_ephemeral_port() const {
148 return ephemeral_port;
149 }
150
151 // learn my ephemeral_port as connector.
152 // unfortunately, there's no way to identify which port I'm using as
153 // connector with current seastar interface.
154 void learn_ephemeral_port_as_connector(uint16_t port) {
155 assert(side == side_t::connector &&
156 (ephemeral_port == 0 || ephemeral_port == port));
157 ephemeral_port = port;
158 }
159
160 seastar::socket_address get_local_address() const {
161 return socket.local_address();
162 }
163
164 private:
165 const seastar::shard_id sid;
166 seastar::connected_socket socket;
167 seastar::input_stream<char> in;
168 seastar::output_stream<char> out;
169 bool socket_is_shutdown;
170 side_t side;
171 uint16_t ephemeral_port;
172
173 #ifndef NDEBUG
174 bool closed = false;
175 #endif
176
177 /// buffer state for read()
178 struct {
179 bufferlist buffer;
180 size_t remaining;
181 } r;
182
183 #ifdef UNIT_TESTS_BUILT
184 public:
185 void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
186
187 private:
188 bp_action_t next_trap_read = bp_action_t::CONTINUE;
189 bp_action_t next_trap_write = bp_action_t::CONTINUE;
190 socket_blocker* blocker = nullptr;
191 seastar::future<> try_trap_pre(bp_action_t& trap);
192 seastar::future<> try_trap_post(bp_action_t& trap);
193
194 #endif
195 friend class FixedCPUServerSocket;
196 };
197
198 using listen_ertr = crimson::errorator<
199 crimson::ct_error::address_in_use, // The address is already bound
200 crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
201 >;
202
203 class FixedCPUServerSocket
204 : public seastar::peering_sharded_service<FixedCPUServerSocket> {
205 const seastar::shard_id cpu;
206 entity_addr_t addr;
207 std::optional<seastar::server_socket> listener;
208 seastar::gate shutdown_gate;
209
210 using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
211 std::unique_ptr<sharded_service_t> service;
212
213 struct construct_tag {};
214
215 static seastar::logger& logger() {
216 return crimson::get_logger(ceph_subsys_ms);
217 }
218
219 seastar::future<> reset() {
220 return container().invoke_on_all([] (auto& ss) {
221 assert(ss.shutdown_gate.is_closed());
222 ss.addr = entity_addr_t();
223 ss.listener.reset();
224 });
225 }
226
227 public:
228 FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
229 ~FixedCPUServerSocket() {
230 assert(!listener);
231 // detect whether user have called destroy() properly
232 ceph_assert(!service);
233 }
234
235 FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
236 FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
237 FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
238
239 listen_ertr::future<> listen(entity_addr_t addr);
240
241 // fn_accept should be a nothrow function of type
242 // seastar::future<>(SocketRef, entity_addr_t)
243 template <typename Func>
244 seastar::future<> accept(Func&& fn_accept) {
245 assert(seastar::this_shard_id() == cpu);
246 logger().trace("FixedCPUServerSocket({})::accept()...", addr);
247 return container().invoke_on_all(
248 [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
249 assert(ss.listener);
250 // gate accepting
251 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
252 // so ignore the returned future
253 std::ignore = seastar::with_gate(ss.shutdown_gate,
254 [&ss, fn_accept = std::move(fn_accept)] () mutable {
255 return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
256 return ss.listener->accept().then(
257 [&ss, fn_accept = std::move(fn_accept)]
258 (seastar::accept_result accept_result) mutable {
259 // assert seastar::listen_options::set_fixed_cpu() works
260 assert(seastar::this_shard_id() == ss.cpu);
261 auto [socket, paddr] = std::move(accept_result);
262 entity_addr_t peer_addr;
263 peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
264 peer_addr.set_type(ss.addr.get_type());
265 SocketRef _socket = std::make_unique<Socket>(
266 std::move(socket), Socket::side_t::acceptor,
267 peer_addr.get_port(), Socket::construct_tag{});
268 std::ignore = seastar::with_gate(ss.shutdown_gate,
269 [socket = std::move(_socket), peer_addr,
270 &ss, fn_accept = std::move(fn_accept)] () mutable {
271 logger().trace("FixedCPUServerSocket({})::accept(): "
272 "accepted peer {}", ss.addr, peer_addr);
273 return fn_accept(std::move(socket), peer_addr
274 ).handle_exception([&ss, peer_addr] (auto eptr) {
275 logger().error("FixedCPUServerSocket({})::accept(): "
276 "fn_accept(s, {}) got unexpected exception {}",
277 ss.addr, peer_addr, eptr);
278 ceph_abort();
279 });
280 });
281 });
282 }).handle_exception_type([&ss] (const std::system_error& e) {
283 if (e.code() == std::errc::connection_aborted ||
284 e.code() == std::errc::invalid_argument) {
285 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
286 ss.addr, e);
287 } else {
288 throw;
289 }
290 }).handle_exception([&ss] (auto eptr) {
291 logger().error("FixedCPUServerSocket({})::accept(): "
292 "got unexpected exception {}", ss.addr, eptr);
293 ceph_abort();
294 });
295 });
296 });
297 }
298
299 seastar::future<> shutdown();
300 seastar::future<> destroy();
301 static seastar::future<FixedCPUServerSocket*> create();
302 };
303
304 } // namespace crimson::net