]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
9f95a23c | 6 | #include <seastar/core/gate.hh> |
11fdf7f2 | 7 | #include <seastar/core/reactor.hh> |
9f95a23c | 8 | #include <seastar/core/sharded.hh> |
11fdf7f2 TL |
9 | #include <seastar/net/packet.hh> |
10 | ||
11 | #include "include/buffer.h" | |
9f95a23c | 12 | #include "msg/msg_types.h" |
11fdf7f2 | 13 | |
9f95a23c TL |
14 | #include "crimson/common/log.h" |
15 | #include "Errors.h" | |
16 | ||
17 | #ifdef UNIT_TESTS_BUILT | |
18 | #include "Interceptor.h" | |
19 | #endif | |
20 | ||
21 | namespace crimson::net { | |
22 | ||
23 | class Socket; | |
24 | using SocketRef = std::unique_ptr<Socket>; | |
11fdf7f2 TL |
25 | |
26 | class Socket | |
27 | { | |
28 | const seastar::shard_id sid; | |
29 | seastar::connected_socket socket; | |
30 | seastar::input_stream<char> in; | |
31 | seastar::output_stream<char> out; | |
32 | ||
9f95a23c TL |
33 | #ifndef NDEBUG |
34 | bool closed = false; | |
35 | #endif | |
36 | ||
11fdf7f2 TL |
37 | /// buffer state for read() |
38 | struct { | |
39 | bufferlist buffer; | |
40 | size_t remaining; | |
41 | } r; | |
42 | ||
9f95a23c TL |
43 | struct construct_tag {}; |
44 | ||
11fdf7f2 | 45 | public: |
9f95a23c | 46 | Socket(seastar::connected_socket&& _socket, construct_tag) |
11fdf7f2 TL |
47 | : sid{seastar::engine().cpu_id()}, |
48 | socket(std::move(_socket)), | |
49 | in(socket.input()), | |
9f95a23c TL |
50 | // the default buffer size 8192 is too small that may impact our write |
51 | // performance. see seastar::net::connected_socket::output() | |
52 | out(socket.output(65536)) {} | |
53 | ||
54 | ~Socket() { | |
55 | #ifndef NDEBUG | |
56 | assert(closed); | |
57 | #endif | |
58 | } | |
59 | ||
11fdf7f2 TL |
60 | Socket(Socket&& o) = delete; |
61 | ||
9f95a23c TL |
62 | static seastar::future<SocketRef> |
63 | connect(const entity_addr_t& peer_addr) { | |
64 | return seastar::connect(peer_addr.in4_addr() | |
65 | ).then([] (seastar::connected_socket socket) { | |
66 | return std::make_unique<Socket>(std::move(socket), construct_tag{}); | |
67 | }); | |
68 | } | |
69 | ||
11fdf7f2 TL |
70 | /// read the requested number of bytes into a bufferlist |
71 | seastar::future<bufferlist> read(size_t bytes); | |
72 | using tmp_buf = seastar::temporary_buffer<char>; | |
73 | using packet = seastar::net::packet; | |
74 | seastar::future<tmp_buf> read_exactly(size_t bytes); | |
75 | ||
76 | seastar::future<> write(packet&& buf) { | |
9f95a23c TL |
77 | #ifdef UNIT_TESTS_BUILT |
78 | return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { | |
79 | #endif | |
80 | return out.write(std::move(buf)); | |
81 | #ifdef UNIT_TESTS_BUILT | |
82 | }).then([this] { | |
83 | return try_trap_post(next_trap_write); | |
84 | }); | |
85 | #endif | |
11fdf7f2 TL |
86 | } |
87 | seastar::future<> flush() { | |
88 | return out.flush(); | |
89 | } | |
90 | seastar::future<> write_flush(packet&& buf) { | |
9f95a23c TL |
91 | #ifdef UNIT_TESTS_BUILT |
92 | return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { | |
93 | #endif | |
94 | return out.write(std::move(buf)).then([this] { return out.flush(); }); | |
95 | #ifdef UNIT_TESTS_BUILT | |
96 | }).then([this] { | |
97 | return try_trap_post(next_trap_write); | |
98 | }); | |
99 | #endif | |
11fdf7f2 TL |
100 | } |
101 | ||
9f95a23c TL |
102 | // preemptively disable further reads or writes, can only be shutdown once. |
103 | void shutdown(); | |
104 | ||
11fdf7f2 | 105 | /// Socket can only be closed once. |
9f95a23c TL |
106 | seastar::future<> close(); |
107 | ||
108 | // shutdown input_stream only, for tests | |
109 | void force_shutdown_in() { | |
110 | socket.shutdown_input(); | |
111 | } | |
112 | ||
113 | // shutdown output_stream only, for tests | |
114 | void force_shutdown_out() { | |
115 | socket.shutdown_output(); | |
116 | } | |
117 | ||
118 | #ifdef UNIT_TESTS_BUILT | |
119 | private: | |
120 | bp_action_t next_trap_read = bp_action_t::CONTINUE; | |
121 | bp_action_t next_trap_write = bp_action_t::CONTINUE; | |
122 | socket_blocker* blocker = nullptr; | |
123 | seastar::future<> try_trap_pre(bp_action_t& trap); | |
124 | seastar::future<> try_trap_post(bp_action_t& trap); | |
125 | ||
126 | public: | |
127 | void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); | |
128 | ||
129 | #endif | |
130 | friend class FixedCPUServerSocket; | |
131 | }; | |
132 | ||
133 | class FixedCPUServerSocket | |
134 | : public seastar::peering_sharded_service<FixedCPUServerSocket> { | |
135 | const seastar::shard_id cpu; | |
136 | entity_addr_t addr; | |
137 | std::optional<seastar::server_socket> listener; | |
138 | seastar::gate shutdown_gate; | |
139 | ||
140 | using sharded_service_t = seastar::sharded<FixedCPUServerSocket>; | |
141 | std::unique_ptr<sharded_service_t> service; | |
142 | ||
143 | struct construct_tag {}; | |
144 | ||
145 | static seastar::logger& logger() { | |
146 | return crimson::get_logger(ceph_subsys_ms); | |
147 | } | |
148 | ||
149 | seastar::future<> reset() { | |
150 | return container().invoke_on_all([] (auto& ss) { | |
151 | assert(ss.shutdown_gate.is_closed()); | |
152 | ss.shutdown_gate = seastar::gate(); | |
153 | ss.addr = entity_addr_t(); | |
154 | ss.listener.reset(); | |
155 | }); | |
156 | } | |
157 | ||
158 | public: | |
159 | FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {} | |
160 | ~FixedCPUServerSocket() { | |
161 | assert(!listener); | |
162 | // detect whether user have called destroy() properly | |
163 | ceph_assert(!service); | |
164 | } | |
165 | ||
166 | FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; | |
167 | FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; | |
168 | FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; | |
169 | ||
170 | seastar::future<> listen(entity_addr_t addr) { | |
171 | assert(seastar::engine().cpu_id() == cpu); | |
172 | logger().trace("FixedCPUServerSocket::listen({})...", addr); | |
173 | return container().invoke_on_all([addr] (auto& ss) { | |
174 | ss.addr = addr; | |
175 | seastar::socket_address s_addr(addr.in4_addr()); | |
176 | seastar::listen_options lo; | |
177 | lo.reuse_address = true; | |
178 | lo.set_fixed_cpu(ss.cpu); | |
179 | ss.listener = seastar::listen(s_addr, lo); | |
180 | }).handle_exception_type([addr] (const std::system_error& e) { | |
181 | if (e.code() == std::errc::address_in_use) { | |
182 | logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); | |
183 | throw; | |
184 | } else { | |
185 | logger().error("FixedCPUServerSocket::listen({}): " | |
186 | "got unexpeted error {}", addr, e); | |
187 | ceph_abort(); | |
188 | } | |
189 | }); | |
190 | } | |
191 | ||
192 | // fn_accept should be a nothrow function of type | |
193 | // seastar::future<>(SocketRef, entity_addr_t) | |
194 | template <typename Func> | |
195 | seastar::future<> accept(Func&& fn_accept) { | |
196 | assert(seastar::engine().cpu_id() == cpu); | |
197 | logger().trace("FixedCPUServerSocket({})::accept()...", addr); | |
198 | return container().invoke_on_all( | |
199 | [fn_accept = std::move(fn_accept)] (auto& ss) mutable { | |
200 | assert(ss.listener); | |
201 | // gate accepting | |
202 | // FixedCPUServerSocket::shutdown() will drain the continuations in the gate | |
203 | // so ignore the returned future | |
204 | std::ignore = seastar::with_gate(ss.shutdown_gate, | |
205 | [&ss, fn_accept = std::move(fn_accept)] () mutable { | |
206 | return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable { | |
207 | return ss.listener->accept().then( | |
208 | [&ss, fn_accept = std::move(fn_accept)] | |
209 | (seastar::accept_result accept_result) mutable { | |
210 | // assert seastar::listen_options::set_fixed_cpu() works | |
211 | assert(seastar::engine().cpu_id() == ss.cpu); | |
212 | auto [socket, paddr] = std::move(accept_result); | |
213 | entity_addr_t peer_addr; | |
214 | peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); | |
215 | peer_addr.set_type(entity_addr_t::TYPE_ANY); | |
216 | SocketRef _socket = std::make_unique<Socket>( | |
217 | std::move(socket), Socket::construct_tag{}); | |
218 | std::ignore = seastar::with_gate(ss.shutdown_gate, | |
219 | [socket = std::move(_socket), peer_addr, | |
220 | &ss, fn_accept = std::move(fn_accept)] () mutable { | |
221 | logger().trace("FixedCPUServerSocket({})::accept(): " | |
222 | "accepted peer {}", ss.addr, peer_addr); | |
223 | return fn_accept(std::move(socket), peer_addr | |
224 | ).handle_exception([&ss, peer_addr] (auto eptr) { | |
225 | logger().error("FixedCPUServerSocket({})::accept(): " | |
226 | "fn_accept(s, {}) got unexpected exception {}", | |
227 | ss.addr, peer_addr, eptr); | |
228 | ceph_abort(); | |
229 | }); | |
230 | }); | |
231 | }); | |
232 | }).handle_exception_type([&ss] (const std::system_error& e) { | |
233 | if (e.code() == std::errc::connection_aborted || | |
234 | e.code() == std::errc::invalid_argument) { | |
235 | logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})", | |
236 | ss.addr, e); | |
237 | } else { | |
238 | throw; | |
239 | } | |
240 | }).handle_exception([&ss] (auto eptr) { | |
241 | logger().error("FixedCPUServerSocket({})::accept(): " | |
242 | "got unexpected exception {}", ss.addr, eptr); | |
243 | ceph_abort(); | |
244 | }); | |
245 | }); | |
246 | }); | |
247 | } | |
248 | ||
249 | seastar::future<> shutdown() { | |
250 | assert(seastar::engine().cpu_id() == cpu); | |
251 | logger().trace("FixedCPUServerSocket({})::shutdown()...", addr); | |
252 | return container().invoke_on_all([] (auto& ss) { | |
253 | if (ss.listener) { | |
254 | ss.listener->abort_accept(); | |
255 | } | |
256 | return ss.shutdown_gate.close(); | |
257 | }).then([this] { | |
258 | return reset(); | |
259 | }); | |
260 | } | |
261 | ||
262 | seastar::future<> destroy() { | |
263 | assert(seastar::engine().cpu_id() == cpu); | |
264 | return shutdown().then([this] { | |
265 | // we should only construct/stop shards on #0 | |
266 | return container().invoke_on(0, [] (auto& ss) { | |
267 | assert(ss.service); | |
268 | return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); | |
269 | }); | |
270 | }); | |
271 | } | |
272 | ||
273 | static seastar::future<FixedCPUServerSocket*> create() { | |
274 | auto cpu = seastar::engine().cpu_id(); | |
275 | // we should only construct/stop shards on #0 | |
276 | return seastar::smp::submit_to(0, [cpu] { | |
277 | auto service = std::make_unique<sharded_service_t>(); | |
278 | return service->start(cpu, construct_tag{} | |
279 | ).then([service = std::move(service)] () mutable { | |
280 | auto p_shard = service.get(); | |
281 | p_shard->local().service = std::move(service); | |
282 | return p_shard; | |
11fdf7f2 | 283 | }); |
9f95a23c TL |
284 | }).then([] (auto p_shard) { |
285 | return &p_shard->local(); | |
286 | }); | |
11fdf7f2 TL |
287 | } |
288 | }; | |
289 | ||
9f95a23c | 290 | } // namespace crimson::net |