]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/Socket.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / net / Socket.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9 #include <seastar/net/packet.hh>
10
11 #include "include/buffer.h"
12 #include "msg/msg_types.h"
13
14 #include "crimson/common/log.h"
15 #include "Errors.h"
16
17 #ifdef UNIT_TESTS_BUILT
18 #include "Interceptor.h"
19 #endif
20
21 namespace crimson::net {
22
23 class Socket;
24 using SocketRef = std::unique_ptr<Socket>;
25
26 class Socket
27 {
28 const seastar::shard_id sid;
29 seastar::connected_socket socket;
30 seastar::input_stream<char> in;
31 seastar::output_stream<char> out;
32
33 #ifndef NDEBUG
34 bool closed = false;
35 #endif
36
37 /// buffer state for read()
38 struct {
39 bufferlist buffer;
40 size_t remaining;
41 } r;
42
43 struct construct_tag {};
44
45 public:
46 Socket(seastar::connected_socket&& _socket, construct_tag)
47 : sid{seastar::engine().cpu_id()},
48 socket(std::move(_socket)),
49 in(socket.input()),
50 // the default buffer size 8192 is too small that may impact our write
51 // performance. see seastar::net::connected_socket::output()
52 out(socket.output(65536)) {}
53
54 ~Socket() {
55 #ifndef NDEBUG
56 assert(closed);
57 #endif
58 }
59
60 Socket(Socket&& o) = delete;
61
62 static seastar::future<SocketRef>
63 connect(const entity_addr_t& peer_addr) {
64 return seastar::connect(peer_addr.in4_addr()
65 ).then([] (seastar::connected_socket socket) {
66 return std::make_unique<Socket>(std::move(socket), construct_tag{});
67 });
68 }
69
70 /// read the requested number of bytes into a bufferlist
71 seastar::future<bufferlist> read(size_t bytes);
72 using tmp_buf = seastar::temporary_buffer<char>;
73 using packet = seastar::net::packet;
74 seastar::future<tmp_buf> read_exactly(size_t bytes);
75
76 seastar::future<> write(packet&& buf) {
77 #ifdef UNIT_TESTS_BUILT
78 return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
79 #endif
80 return out.write(std::move(buf));
81 #ifdef UNIT_TESTS_BUILT
82 }).then([this] {
83 return try_trap_post(next_trap_write);
84 });
85 #endif
86 }
87 seastar::future<> flush() {
88 return out.flush();
89 }
90 seastar::future<> write_flush(packet&& buf) {
91 #ifdef UNIT_TESTS_BUILT
92 return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
93 #endif
94 return out.write(std::move(buf)).then([this] { return out.flush(); });
95 #ifdef UNIT_TESTS_BUILT
96 }).then([this] {
97 return try_trap_post(next_trap_write);
98 });
99 #endif
100 }
101
102 // preemptively disable further reads or writes, can only be shutdown once.
103 void shutdown();
104
105 /// Socket can only be closed once.
106 seastar::future<> close();
107
108 // shutdown input_stream only, for tests
109 void force_shutdown_in() {
110 socket.shutdown_input();
111 }
112
113 // shutdown output_stream only, for tests
114 void force_shutdown_out() {
115 socket.shutdown_output();
116 }
117
118 #ifdef UNIT_TESTS_BUILT
119 private:
120 bp_action_t next_trap_read = bp_action_t::CONTINUE;
121 bp_action_t next_trap_write = bp_action_t::CONTINUE;
122 socket_blocker* blocker = nullptr;
123 seastar::future<> try_trap_pre(bp_action_t& trap);
124 seastar::future<> try_trap_post(bp_action_t& trap);
125
126 public:
127 void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
128
129 #endif
130 friend class FixedCPUServerSocket;
131 };
132
133 class FixedCPUServerSocket
134 : public seastar::peering_sharded_service<FixedCPUServerSocket> {
135 const seastar::shard_id cpu;
136 entity_addr_t addr;
137 std::optional<seastar::server_socket> listener;
138 seastar::gate shutdown_gate;
139
140 using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
141 std::unique_ptr<sharded_service_t> service;
142
143 struct construct_tag {};
144
145 static seastar::logger& logger() {
146 return crimson::get_logger(ceph_subsys_ms);
147 }
148
149 seastar::future<> reset() {
150 return container().invoke_on_all([] (auto& ss) {
151 assert(ss.shutdown_gate.is_closed());
152 ss.shutdown_gate = seastar::gate();
153 ss.addr = entity_addr_t();
154 ss.listener.reset();
155 });
156 }
157
158 public:
159 FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
160 ~FixedCPUServerSocket() {
161 assert(!listener);
162 // detect whether user have called destroy() properly
163 ceph_assert(!service);
164 }
165
166 FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
167 FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
168 FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
169
170 seastar::future<> listen(entity_addr_t addr) {
171 assert(seastar::engine().cpu_id() == cpu);
172 logger().trace("FixedCPUServerSocket::listen({})...", addr);
173 return container().invoke_on_all([addr] (auto& ss) {
174 ss.addr = addr;
175 seastar::socket_address s_addr(addr.in4_addr());
176 seastar::listen_options lo;
177 lo.reuse_address = true;
178 lo.set_fixed_cpu(ss.cpu);
179 ss.listener = seastar::listen(s_addr, lo);
180 }).handle_exception_type([addr] (const std::system_error& e) {
181 if (e.code() == std::errc::address_in_use) {
182 logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
183 throw;
184 } else {
185 logger().error("FixedCPUServerSocket::listen({}): "
186 "got unexpeted error {}", addr, e);
187 ceph_abort();
188 }
189 });
190 }
191
192 // fn_accept should be a nothrow function of type
193 // seastar::future<>(SocketRef, entity_addr_t)
194 template <typename Func>
195 seastar::future<> accept(Func&& fn_accept) {
196 assert(seastar::engine().cpu_id() == cpu);
197 logger().trace("FixedCPUServerSocket({})::accept()...", addr);
198 return container().invoke_on_all(
199 [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
200 assert(ss.listener);
201 // gate accepting
202 // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
203 // so ignore the returned future
204 std::ignore = seastar::with_gate(ss.shutdown_gate,
205 [&ss, fn_accept = std::move(fn_accept)] () mutable {
206 return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
207 return ss.listener->accept().then(
208 [&ss, fn_accept = std::move(fn_accept)]
209 (seastar::accept_result accept_result) mutable {
210 // assert seastar::listen_options::set_fixed_cpu() works
211 assert(seastar::engine().cpu_id() == ss.cpu);
212 auto [socket, paddr] = std::move(accept_result);
213 entity_addr_t peer_addr;
214 peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
215 peer_addr.set_type(entity_addr_t::TYPE_ANY);
216 SocketRef _socket = std::make_unique<Socket>(
217 std::move(socket), Socket::construct_tag{});
218 std::ignore = seastar::with_gate(ss.shutdown_gate,
219 [socket = std::move(_socket), peer_addr,
220 &ss, fn_accept = std::move(fn_accept)] () mutable {
221 logger().trace("FixedCPUServerSocket({})::accept(): "
222 "accepted peer {}", ss.addr, peer_addr);
223 return fn_accept(std::move(socket), peer_addr
224 ).handle_exception([&ss, peer_addr] (auto eptr) {
225 logger().error("FixedCPUServerSocket({})::accept(): "
226 "fn_accept(s, {}) got unexpected exception {}",
227 ss.addr, peer_addr, eptr);
228 ceph_abort();
229 });
230 });
231 });
232 }).handle_exception_type([&ss] (const std::system_error& e) {
233 if (e.code() == std::errc::connection_aborted ||
234 e.code() == std::errc::invalid_argument) {
235 logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
236 ss.addr, e);
237 } else {
238 throw;
239 }
240 }).handle_exception([&ss] (auto eptr) {
241 logger().error("FixedCPUServerSocket({})::accept(): "
242 "got unexpected exception {}", ss.addr, eptr);
243 ceph_abort();
244 });
245 });
246 });
247 }
248
249 seastar::future<> shutdown() {
250 assert(seastar::engine().cpu_id() == cpu);
251 logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
252 return container().invoke_on_all([] (auto& ss) {
253 if (ss.listener) {
254 ss.listener->abort_accept();
255 }
256 return ss.shutdown_gate.close();
257 }).then([this] {
258 return reset();
259 });
260 }
261
262 seastar::future<> destroy() {
263 assert(seastar::engine().cpu_id() == cpu);
264 return shutdown().then([this] {
265 // we should only construct/stop shards on #0
266 return container().invoke_on(0, [] (auto& ss) {
267 assert(ss.service);
268 return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
269 });
270 });
271 }
272
273 static seastar::future<FixedCPUServerSocket*> create() {
274 auto cpu = seastar::engine().cpu_id();
275 // we should only construct/stop shards on #0
276 return seastar::smp::submit_to(0, [cpu] {
277 auto service = std::make_unique<sharded_service_t>();
278 return service->start(cpu, construct_tag{}
279 ).then([service = std::move(service)] () mutable {
280 auto p_shard = service.get();
281 p_shard->local().service = std::move(service);
282 return p_shard;
283 });
284 }).then([] (auto p_shard) {
285 return &p_shard->local();
286 });
287 }
288 };
289
290 } // namespace crimson::net