]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/Socket.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / Socket.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#pragma once
5
9f95a23c 6#include <seastar/core/gate.hh>
11fdf7f2 7#include <seastar/core/reactor.hh>
9f95a23c 8#include <seastar/core/sharded.hh>
11fdf7f2
TL
9#include <seastar/net/packet.hh>
10
11#include "include/buffer.h"
12
9f95a23c
TL
13#include "crimson/common/log.h"
14#include "Errors.h"
f67539c2 15#include "Fwd.h"
9f95a23c
TL
16
17#ifdef UNIT_TESTS_BUILT
18#include "Interceptor.h"
19#endif
20
21namespace crimson::net {
22
23class Socket;
24using SocketRef = std::unique_ptr<Socket>;
11fdf7f2
TL
25
26class Socket
27{
9f95a23c
TL
28 struct construct_tag {};
29
11fdf7f2 30 public:
f67539c2
TL
31 // if acceptor side, peer is using a different port (ephemeral_port)
32 // if connector side, I'm using a different port (ephemeral_port)
33 enum class side_t {
34 acceptor,
35 connector
36 };
37
38 Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
39 : sid{seastar::this_shard_id()},
11fdf7f2
TL
40 socket(std::move(_socket)),
41 in(socket.input()),
9f95a23c
TL
42 // the default buffer size 8192 is too small that may impact our write
43 // performance. see seastar::net::connected_socket::output()
f67539c2
TL
44 out(socket.output(65536)),
45 side(_side),
46 ephemeral_port(e_port) {}
9f95a23c
TL
47
48 ~Socket() {
49#ifndef NDEBUG
50 assert(closed);
51#endif
52 }
53
11fdf7f2
TL
54 Socket(Socket&& o) = delete;
55
9f95a23c
TL
56 static seastar::future<SocketRef>
57 connect(const entity_addr_t& peer_addr) {
58 return seastar::connect(peer_addr.in4_addr()
59 ).then([] (seastar::connected_socket socket) {
f67539c2
TL
60 return std::make_unique<Socket>(
61 std::move(socket), side_t::connector, 0, construct_tag{});
9f95a23c
TL
62 });
63 }
64
11fdf7f2
TL
65 /// read the requested number of bytes into a bufferlist
66 seastar::future<bufferlist> read(size_t bytes);
67 using tmp_buf = seastar::temporary_buffer<char>;
68 using packet = seastar::net::packet;
69 seastar::future<tmp_buf> read_exactly(size_t bytes);
70
71 seastar::future<> write(packet&& buf) {
9f95a23c
TL
72#ifdef UNIT_TESTS_BUILT
73 return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
74#endif
75 return out.write(std::move(buf));
76#ifdef UNIT_TESTS_BUILT
77 }).then([this] {
78 return try_trap_post(next_trap_write);
79 });
80#endif
11fdf7f2
TL
81 }
82 seastar::future<> flush() {
83 return out.flush();
84 }
85 seastar::future<> write_flush(packet&& buf) {
9f95a23c
TL
86#ifdef UNIT_TESTS_BUILT
87 return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
88#endif
89 return out.write(std::move(buf)).then([this] { return out.flush(); });
90#ifdef UNIT_TESTS_BUILT
91 }).then([this] {
92 return try_trap_post(next_trap_write);
93 });
94#endif
11fdf7f2
TL
95 }
96
9f95a23c
TL
97 // preemptively disable further reads or writes, can only be shutdown once.
98 void shutdown();
99
11fdf7f2 100 /// Socket can only be closed once.
9f95a23c
TL
101 seastar::future<> close();
102
103 // shutdown input_stream only, for tests
104 void force_shutdown_in() {
105 socket.shutdown_input();
106 }
107
108 // shutdown output_stream only, for tests
109 void force_shutdown_out() {
110 socket.shutdown_output();
111 }
112
f67539c2
TL
113 side_t get_side() const {
114 return side;
115 }
116
117 uint16_t get_ephemeral_port() const {
118 return ephemeral_port;
119 }
120
121 // learn my ephemeral_port as connector.
122 // unfortunately, there's no way to identify which port I'm using as
123 // connector with current seastar interface.
124 void learn_ephemeral_port_as_connector(uint16_t port) {
125 assert(side == side_t::connector &&
126 (ephemeral_port == 0 || ephemeral_port == port));
127 ephemeral_port = port;
128 }
129
20effc67
TL
130 seastar::socket_address get_local_address() const {
131 return socket.local_address();
132 }
133
f67539c2
TL
134 private:
135 const seastar::shard_id sid;
136 seastar::connected_socket socket;
137 seastar::input_stream<char> in;
138 seastar::output_stream<char> out;
139 side_t side;
140 uint16_t ephemeral_port;
141
142#ifndef NDEBUG
143 bool closed = false;
144#endif
145
146 /// buffer state for read()
147 struct {
148 bufferlist buffer;
149 size_t remaining;
150 } r;
151
9f95a23c 152#ifdef UNIT_TESTS_BUILT
f67539c2
TL
153 public:
154 void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
155
9f95a23c
TL
156 private:
157 bp_action_t next_trap_read = bp_action_t::CONTINUE;
158 bp_action_t next_trap_write = bp_action_t::CONTINUE;
159 socket_blocker* blocker = nullptr;
160 seastar::future<> try_trap_pre(bp_action_t& trap);
161 seastar::future<> try_trap_post(bp_action_t& trap);
162
9f95a23c
TL
163#endif
164 friend class FixedCPUServerSocket;
165};
166
20effc67
TL
167using listen_ertr = crimson::errorator<
168 crimson::ct_error::address_in_use, // The address is already bound
169 crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
170 >;
171
9f95a23c
TL
172class FixedCPUServerSocket
173 : public seastar::peering_sharded_service<FixedCPUServerSocket> {
174 const seastar::shard_id cpu;
175 entity_addr_t addr;
176 std::optional<seastar::server_socket> listener;
177 seastar::gate shutdown_gate;
178
179 using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
180 std::unique_ptr<sharded_service_t> service;
181
182 struct construct_tag {};
183
184 static seastar::logger& logger() {
185 return crimson::get_logger(ceph_subsys_ms);
186 }
187
188 seastar::future<> reset() {
189 return container().invoke_on_all([] (auto& ss) {
190 assert(ss.shutdown_gate.is_closed());
9f95a23c
TL
191 ss.addr = entity_addr_t();
192 ss.listener.reset();
193 });
194 }
195
196public:
197 FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
198 ~FixedCPUServerSocket() {
199 assert(!listener);
200 // detect whether user have called destroy() properly
201 ceph_assert(!service);
202 }
203
204 FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
205 FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
206 FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
207
f67539c2 208 listen_ertr::future<> listen(entity_addr_t addr);
9f95a23c
TL
209
210 // fn_accept should be a nothrow function of type
211 // seastar::future<>(SocketRef, entity_addr_t)
212 template <typename Func>
213 seastar::future<> accept(Func&& fn_accept) {
f67539c2 214 assert(seastar::this_shard_id() == cpu);
9f95a23c
TL
215 logger().trace("FixedCPUServerSocket({})::accept()...", addr);
216 return container().invoke_on_all(
217 [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
218 assert(ss.listener);
219 // gate accepting
220 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
221 // so ignore the returned future
222 std::ignore = seastar::with_gate(ss.shutdown_gate,
223 [&ss, fn_accept = std::move(fn_accept)] () mutable {
224 return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
225 return ss.listener->accept().then(
226 [&ss, fn_accept = std::move(fn_accept)]
227 (seastar::accept_result accept_result) mutable {
228 // assert seastar::listen_options::set_fixed_cpu() works
f67539c2 229 assert(seastar::this_shard_id() == ss.cpu);
9f95a23c
TL
230 auto [socket, paddr] = std::move(accept_result);
231 entity_addr_t peer_addr;
232 peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
20effc67 233 peer_addr.set_type(ss.addr.get_type());
9f95a23c 234 SocketRef _socket = std::make_unique<Socket>(
f67539c2
TL
235 std::move(socket), Socket::side_t::acceptor,
236 peer_addr.get_port(), Socket::construct_tag{});
9f95a23c
TL
237 std::ignore = seastar::with_gate(ss.shutdown_gate,
238 [socket = std::move(_socket), peer_addr,
239 &ss, fn_accept = std::move(fn_accept)] () mutable {
240 logger().trace("FixedCPUServerSocket({})::accept(): "
241 "accepted peer {}", ss.addr, peer_addr);
242 return fn_accept(std::move(socket), peer_addr
243 ).handle_exception([&ss, peer_addr] (auto eptr) {
244 logger().error("FixedCPUServerSocket({})::accept(): "
245 "fn_accept(s, {}) got unexpected exception {}",
246 ss.addr, peer_addr, eptr);
247 ceph_abort();
248 });
249 });
250 });
251 }).handle_exception_type([&ss] (const std::system_error& e) {
252 if (e.code() == std::errc::connection_aborted ||
253 e.code() == std::errc::invalid_argument) {
254 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
255 ss.addr, e);
256 } else {
257 throw;
258 }
259 }).handle_exception([&ss] (auto eptr) {
260 logger().error("FixedCPUServerSocket({})::accept(): "
261 "got unexpected exception {}", ss.addr, eptr);
262 ceph_abort();
263 });
264 });
265 });
266 }
267
f67539c2
TL
268 seastar::future<> shutdown();
269 seastar::future<> destroy();
270 static seastar::future<FixedCPUServerSocket*> create();
11fdf7f2
TL
271};
272
9f95a23c 273} // namespace crimson::net