]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "Socket.h" | |
5 | ||
1e59de90 | 6 | #include <seastar/core/sleep.hh> |
f67539c2 | 7 | #include <seastar/core/when_all.hh> |
aee94f69 | 8 | #include <seastar/net/packet.hh> |
f67539c2 | 9 | |
9f95a23c | 10 | #include "crimson/common/log.h" |
11fdf7f2 TL |
11 | #include "Errors.h" |
12 | ||
1e59de90 TL |
13 | using crimson::common::local_conf; |
14 | ||
9f95a23c | 15 | namespace crimson::net { |
11fdf7f2 TL |
16 | |
17 | namespace { | |
18 | ||
9f95a23c TL |
19 | seastar::logger& logger() { |
20 | return crimson::get_logger(ceph_subsys_ms); | |
21 | } | |
22 | ||
aee94f69 TL |
23 | using tmp_buf = seastar::temporary_buffer<char>; |
24 | using packet = seastar::net::packet; | |
25 | ||
11fdf7f2 TL |
26 | // an input_stream consumer that reads buffer segments into a bufferlist up to |
27 | // the given number of remaining bytes | |
28 | struct bufferlist_consumer { | |
29 | bufferlist& bl; | |
30 | size_t& remaining; | |
31 | ||
32 | bufferlist_consumer(bufferlist& bl, size_t& remaining) | |
33 | : bl(bl), remaining(remaining) {} | |
34 | ||
11fdf7f2 TL |
35 | using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; |
36 | ||
37 | // consume some or all of a buffer segment | |
38 | seastar::future<consumption_result_type> operator()(tmp_buf&& data) { | |
39 | if (remaining >= data.size()) { | |
40 | // consume the whole buffer | |
41 | remaining -= data.size(); | |
1e59de90 | 42 | bl.append(buffer::create(std::move(data))); |
11fdf7f2 TL |
43 | if (remaining > 0) { |
44 | // return none to request more segments | |
45 | return seastar::make_ready_future<consumption_result_type>( | |
46 | seastar::continue_consuming{}); | |
47 | } else { | |
48 | // return an empty buffer to singal that we're done | |
49 | return seastar::make_ready_future<consumption_result_type>( | |
50 | consumption_result_type::stop_consuming_type({})); | |
51 | } | |
52 | } | |
53 | if (remaining > 0) { | |
54 | // consume the front | |
1e59de90 | 55 | bl.append(buffer::create(data.share(0, remaining))); |
11fdf7f2 TL |
56 | data.trim_front(remaining); |
57 | remaining = 0; | |
58 | } | |
59 | // give the rest back to signal that we're done | |
60 | return seastar::make_ready_future<consumption_result_type>( | |
61 | consumption_result_type::stop_consuming_type{std::move(data)}); | |
62 | }; | |
63 | }; | |
64 | ||
aee94f69 TL |
65 | seastar::future<> inject_delay() |
66 | { | |
67 | if (float delay_period = local_conf()->ms_inject_internal_delays; | |
68 | delay_period) { | |
69 | logger().debug("Socket::inject_delay: sleep for {}", delay_period); | |
70 | return seastar::sleep( | |
71 | std::chrono::milliseconds((int)(delay_period * 1000.0))); | |
72 | } | |
73 | return seastar::now(); | |
74 | } | |
75 | ||
76 | void inject_failure() | |
77 | { | |
78 | if (local_conf()->ms_inject_socket_failures) { | |
79 | uint64_t rand = | |
80 | ceph::util::generate_random_number<uint64_t>(1, RAND_MAX); | |
81 | if (rand % local_conf()->ms_inject_socket_failures == 0) { | |
82 | logger().warn("Socket::inject_failure: injecting socket failure"); | |
83 | throw std::system_error(make_error_code( | |
84 | error::negotiation_failure)); | |
85 | } | |
86 | } | |
87 | } | |
88 | ||
11fdf7f2 TL |
89 | } // anonymous namespace |
90 | ||
aee94f69 TL |
91 | Socket::Socket( |
92 | seastar::connected_socket &&_socket, | |
93 | side_t _side, | |
94 | uint16_t e_port, | |
95 | construct_tag) | |
96 | : sid{seastar::this_shard_id()}, | |
97 | socket(std::move(_socket)), | |
98 | in(socket.input()), | |
99 | // the default buffer size 8192 is too small that may impact our write | |
100 | // performance. see seastar::net::connected_socket::output() | |
101 | out(socket.output(65536)), | |
102 | socket_is_shutdown(false), | |
103 | side(_side), | |
104 | ephemeral_port(e_port) | |
11fdf7f2 | 105 | { |
aee94f69 TL |
106 | if (local_conf()->ms_tcp_nodelay) { |
107 | socket.set_nodelay(true); | |
108 | } | |
109 | } | |
110 | ||
111 | Socket::~Socket() | |
112 | { | |
113 | assert(seastar::this_shard_id() == sid); | |
114 | #ifndef NDEBUG | |
115 | assert(closed); | |
116 | #endif | |
117 | } | |
118 | ||
119 | seastar::future<bufferlist> | |
120 | Socket::read(size_t bytes) | |
121 | { | |
122 | assert(seastar::this_shard_id() == sid); | |
9f95a23c TL |
123 | #ifdef UNIT_TESTS_BUILT |
124 | return try_trap_pre(next_trap_read).then([bytes, this] { | |
125 | #endif | |
126 | if (bytes == 0) { | |
127 | return seastar::make_ready_future<bufferlist>(); | |
128 | } | |
129 | r.buffer.clear(); | |
130 | r.remaining = bytes; | |
131 | return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] { | |
11fdf7f2 TL |
132 | if (r.remaining) { // throw on short reads |
133 | throw std::system_error(make_error_code(error::read_eof)); | |
134 | } | |
1e59de90 TL |
135 | inject_failure(); |
136 | return inject_delay().then([this] { | |
137 | return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); | |
138 | }); | |
11fdf7f2 | 139 | }); |
9f95a23c | 140 | #ifdef UNIT_TESTS_BUILT |
aee94f69 | 141 | }).then([this](auto buf) { |
9f95a23c | 142 | return try_trap_post(next_trap_read |
aee94f69 | 143 | ).then([buf = std::move(buf)]() mutable { |
9f95a23c TL |
144 | return std::move(buf); |
145 | }); | |
146 | }); | |
147 | #endif | |
11fdf7f2 TL |
148 | } |
149 | ||
aee94f69 | 150 | seastar::future<bufferptr> |
11fdf7f2 | 151 | Socket::read_exactly(size_t bytes) { |
aee94f69 | 152 | assert(seastar::this_shard_id() == sid); |
9f95a23c TL |
153 | #ifdef UNIT_TESTS_BUILT |
154 | return try_trap_pre(next_trap_read).then([bytes, this] { | |
155 | #endif | |
156 | if (bytes == 0) { | |
aee94f69 | 157 | return seastar::make_ready_future<bufferptr>(); |
9f95a23c | 158 | } |
20effc67 | 159 | return in.read_exactly(bytes).then([bytes](auto buf) { |
aee94f69 TL |
160 | bufferptr ptr(buffer::create(buf.share())); |
161 | if (ptr.length() < bytes) { | |
11fdf7f2 TL |
162 | throw std::system_error(make_error_code(error::read_eof)); |
163 | } | |
1e59de90 TL |
164 | inject_failure(); |
165 | return inject_delay( | |
aee94f69 TL |
166 | ).then([ptr = std::move(ptr)]() mutable { |
167 | return seastar::make_ready_future<bufferptr>(std::move(ptr)); | |
1e59de90 | 168 | }); |
11fdf7f2 | 169 | }); |
9f95a23c | 170 | #ifdef UNIT_TESTS_BUILT |
aee94f69 | 171 | }).then([this](auto ptr) { |
9f95a23c | 172 | return try_trap_post(next_trap_read |
aee94f69 TL |
173 | ).then([ptr = std::move(ptr)]() mutable { |
174 | return std::move(ptr); | |
9f95a23c TL |
175 | }); |
176 | }); | |
177 | #endif | |
178 | } | |
179 | ||
aee94f69 TL |
180 | seastar::future<> |
181 | Socket::write(bufferlist buf) | |
182 | { | |
183 | assert(seastar::this_shard_id() == sid); | |
184 | #ifdef UNIT_TESTS_BUILT | |
185 | return try_trap_pre(next_trap_write | |
186 | ).then([buf = std::move(buf), this]() mutable { | |
187 | #endif | |
188 | inject_failure(); | |
189 | return inject_delay( | |
190 | ).then([buf = std::move(buf), this]() mutable { | |
191 | packet p(std::move(buf)); | |
192 | return out.write(std::move(p)); | |
193 | }); | |
194 | #ifdef UNIT_TESTS_BUILT | |
195 | }).then([this] { | |
196 | return try_trap_post(next_trap_write); | |
197 | }); | |
198 | #endif | |
199 | } | |
200 | ||
201 | seastar::future<> | |
202 | Socket::flush() | |
203 | { | |
204 | assert(seastar::this_shard_id() == sid); | |
205 | inject_failure(); | |
206 | return inject_delay().then([this] { | |
207 | return out.flush(); | |
208 | }); | |
209 | } | |
210 | ||
211 | seastar::future<> | |
212 | Socket::write_flush(bufferlist buf) | |
213 | { | |
214 | assert(seastar::this_shard_id() == sid); | |
215 | #ifdef UNIT_TESTS_BUILT | |
216 | return try_trap_pre(next_trap_write | |
217 | ).then([buf = std::move(buf), this]() mutable { | |
218 | #endif | |
219 | inject_failure(); | |
220 | return inject_delay( | |
221 | ).then([buf = std::move(buf), this]() mutable { | |
222 | packet p(std::move(buf)); | |
223 | return out.write(std::move(p) | |
224 | ).then([this] { | |
225 | return out.flush(); | |
226 | }); | |
227 | }); | |
228 | #ifdef UNIT_TESTS_BUILT | |
229 | }).then([this] { | |
230 | return try_trap_post(next_trap_write); | |
231 | }); | |
232 | #endif | |
233 | } | |
234 | ||
235 | void Socket::shutdown() | |
236 | { | |
237 | assert(seastar::this_shard_id() == sid); | |
1e59de90 | 238 | socket_is_shutdown = true; |
9f95a23c TL |
239 | socket.shutdown_input(); |
240 | socket.shutdown_output(); | |
241 | } | |
242 | ||
243 | static inline seastar::future<> | |
244 | close_and_handle_errors(seastar::output_stream<char>& out) | |
245 | { | |
aee94f69 | 246 | return out.close().handle_exception_type([](const std::system_error& e) { |
9f95a23c TL |
247 | if (e.code() != std::errc::broken_pipe && |
248 | e.code() != std::errc::connection_reset) { | |
aee94f69 | 249 | logger().error("Socket::close(): unexpected error {}", e.what()); |
9f95a23c TL |
250 | ceph_abort(); |
251 | } | |
252 | // can happen when out is already shutdown, ignore | |
253 | }); | |
254 | } | |
255 | ||
aee94f69 TL |
256 | seastar::future<> |
257 | Socket::close() | |
258 | { | |
259 | assert(seastar::this_shard_id() == sid); | |
9f95a23c | 260 | #ifndef NDEBUG |
aee94f69 | 261 | ceph_assert_always(!closed); |
9f95a23c TL |
262 | closed = true; |
263 | #endif | |
264 | return seastar::when_all_succeed( | |
1e59de90 | 265 | inject_delay(), |
9f95a23c TL |
266 | in.close(), |
267 | close_and_handle_errors(out) | |
f67539c2 TL |
268 | ).then_unpack([] { |
269 | return seastar::make_ready_future<>(); | |
aee94f69 TL |
270 | }).handle_exception([](auto eptr) { |
271 | const char *e_what; | |
272 | try { | |
273 | std::rethrow_exception(eptr); | |
274 | } catch (std::exception &e) { | |
275 | e_what = e.what(); | |
276 | } | |
277 | logger().error("Socket::close(): unexpected exception {}", e_what); | |
9f95a23c TL |
278 | ceph_abort(); |
279 | }); | |
280 | } | |
281 | ||
aee94f69 TL |
282 | seastar::future<SocketRef> |
283 | Socket::connect(const entity_addr_t &peer_addr) | |
284 | { | |
285 | inject_failure(); | |
286 | return inject_delay( | |
287 | ).then([peer_addr] { | |
288 | return seastar::connect(peer_addr.in4_addr()); | |
289 | }).then([peer_addr](seastar::connected_socket socket) { | |
290 | auto ret = std::make_unique<Socket>( | |
291 | std::move(socket), side_t::connector, 0, construct_tag{}); | |
292 | logger().debug("Socket::connect(): connected to {}, socket {}", | |
293 | peer_addr, fmt::ptr(ret)); | |
294 | return ret; | |
295 | }); | |
1e59de90 TL |
296 | } |
297 | ||
aee94f69 TL |
298 | #ifdef UNIT_TESTS_BUILT |
299 | void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { | |
300 | assert(seastar::this_shard_id() == sid); | |
301 | blocker = blocker_; | |
302 | if (type == bp_type_t::READ) { | |
303 | ceph_assert_always(next_trap_read == bp_action_t::CONTINUE); | |
304 | next_trap_read = action; | |
305 | } else { // type == bp_type_t::WRITE | |
306 | if (next_trap_write == bp_action_t::CONTINUE) { | |
307 | next_trap_write = action; | |
308 | } else if (next_trap_write == bp_action_t::FAULT) { | |
309 | // do_sweep_messages() may combine multiple write events into one socket write | |
310 | ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); | |
311 | } else { | |
312 | ceph_abort(); | |
1e59de90 TL |
313 | } |
314 | } | |
315 | } | |
316 | ||
aee94f69 TL |
317 | seastar::future<> |
318 | Socket::try_trap_pre(bp_action_t& trap) { | |
9f95a23c TL |
319 | auto action = trap; |
320 | trap = bp_action_t::CONTINUE; | |
321 | switch (action) { | |
322 | case bp_action_t::CONTINUE: | |
323 | break; | |
324 | case bp_action_t::FAULT: | |
325 | logger().info("[Test] got FAULT"); | |
aee94f69 | 326 | throw std::system_error(make_error_code(error::negotiation_failure)); |
9f95a23c TL |
327 | case bp_action_t::BLOCK: |
328 | logger().info("[Test] got BLOCK"); | |
329 | return blocker->block(); | |
330 | case bp_action_t::STALL: | |
331 | trap = action; | |
332 | break; | |
333 | default: | |
334 | ceph_abort("unexpected action from trap"); | |
335 | } | |
f67539c2 | 336 | return seastar::make_ready_future<>(); |
9f95a23c TL |
337 | } |
338 | ||
aee94f69 TL |
339 | seastar::future<> |
340 | Socket::try_trap_post(bp_action_t& trap) { | |
9f95a23c TL |
341 | auto action = trap; |
342 | trap = bp_action_t::CONTINUE; | |
343 | switch (action) { | |
344 | case bp_action_t::CONTINUE: | |
345 | break; | |
346 | case bp_action_t::STALL: | |
347 | logger().info("[Test] got STALL and block"); | |
1e59de90 | 348 | force_shutdown(); |
9f95a23c TL |
349 | return blocker->block(); |
350 | default: | |
351 | ceph_abort("unexpected action from trap"); | |
352 | } | |
f67539c2 | 353 | return seastar::make_ready_future<>(); |
9f95a23c | 354 | } |
aee94f69 | 355 | #endif |
9f95a23c | 356 | |
aee94f69 TL |
357 | ShardedServerSocket::ShardedServerSocket( |
358 | seastar::shard_id sid, | |
359 | bool dispatch_only_on_primary_sid, | |
360 | construct_tag) | |
361 | : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid} | |
362 | { | |
363 | } | |
364 | ||
365 | ShardedServerSocket::~ShardedServerSocket() | |
366 | { | |
367 | assert(!listener); | |
368 | // detect whether user have called destroy() properly | |
369 | ceph_assert_always(!service); | |
11fdf7f2 TL |
370 | } |
371 | ||
aee94f69 TL |
372 | listen_ertr::future<> |
373 | ShardedServerSocket::listen(entity_addr_t addr) | |
f67539c2 | 374 | { |
aee94f69 TL |
375 | ceph_assert_always(seastar::this_shard_id() == primary_sid); |
376 | logger().debug("ShardedServerSocket({})::listen()...", addr); | |
377 | return this->container().invoke_on_all([addr](auto& ss) { | |
378 | ss.listen_addr = addr; | |
f67539c2 TL |
379 | seastar::socket_address s_addr(addr.in4_addr()); |
380 | seastar::listen_options lo; | |
381 | lo.reuse_address = true; | |
aee94f69 TL |
382 | if (ss.dispatch_only_on_primary_sid) { |
383 | lo.set_fixed_cpu(ss.primary_sid); | |
384 | } | |
f67539c2 TL |
385 | ss.listener = seastar::listen(s_addr, lo); |
386 | }).then([] { | |
20effc67 TL |
387 | return listen_ertr::now(); |
388 | }).handle_exception_type( | |
aee94f69 | 389 | [addr](const std::system_error& e) -> listen_ertr::future<> { |
f67539c2 | 390 | if (e.code() == std::errc::address_in_use) { |
aee94f69 | 391 | logger().debug("ShardedServerSocket({})::listen(): address in use", addr); |
f67539c2 | 392 | return crimson::ct_error::address_in_use::make(); |
20effc67 | 393 | } else if (e.code() == std::errc::address_not_available) { |
aee94f69 | 394 | logger().debug("ShardedServerSocket({})::listen(): address not available", |
20effc67 TL |
395 | addr); |
396 | return crimson::ct_error::address_not_available::make(); | |
f67539c2 | 397 | } |
aee94f69 TL |
398 | logger().error("ShardedServerSocket({})::listen(): " |
399 | "got unexpeted error {}", addr, e.what()); | |
20effc67 | 400 | ceph_abort(); |
f67539c2 TL |
401 | }); |
402 | } | |
403 | ||
aee94f69 TL |
404 | seastar::future<> |
405 | ShardedServerSocket::accept(accept_func_t &&_fn_accept) | |
f67539c2 | 406 | { |
aee94f69 TL |
407 | ceph_assert_always(seastar::this_shard_id() == primary_sid); |
408 | logger().debug("ShardedServerSocket({})::accept()...", listen_addr); | |
409 | return this->container().invoke_on_all([_fn_accept](auto &ss) { | |
410 | assert(ss.listener); | |
411 | ss.fn_accept = _fn_accept; | |
412 | // gate accepting | |
413 | // ShardedServerSocket::shutdown() will drain the continuations in the gate | |
414 | // so ignore the returned future | |
415 | std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { | |
416 | return seastar::keep_doing([&ss] { | |
417 | return ss.listener->accept( | |
418 | ).then([&ss](seastar::accept_result accept_result) { | |
419 | #ifndef NDEBUG | |
420 | if (ss.dispatch_only_on_primary_sid) { | |
421 | // see seastar::listen_options::set_fixed_cpu() | |
422 | ceph_assert_always(seastar::this_shard_id() == ss.primary_sid); | |
423 | } | |
424 | #endif | |
425 | auto [socket, paddr] = std::move(accept_result); | |
426 | entity_addr_t peer_addr; | |
427 | peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); | |
428 | peer_addr.set_type(ss.listen_addr.get_type()); | |
429 | SocketRef _socket = std::make_unique<Socket>( | |
430 | std::move(socket), Socket::side_t::acceptor, | |
431 | peer_addr.get_port(), Socket::construct_tag{}); | |
432 | logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, " | |
433 | "socket {}, dispatch_only_on_primary_sid = {}", | |
434 | ss.listen_addr, peer_addr, fmt::ptr(_socket), | |
435 | ss.dispatch_only_on_primary_sid); | |
436 | std::ignore = seastar::with_gate( | |
437 | ss.shutdown_gate, | |
438 | [socket=std::move(_socket), peer_addr, &ss]() mutable { | |
439 | return ss.fn_accept(std::move(socket), peer_addr | |
440 | ).handle_exception([&ss, peer_addr](auto eptr) { | |
441 | const char *e_what; | |
442 | try { | |
443 | std::rethrow_exception(eptr); | |
444 | } catch (std::exception &e) { | |
445 | e_what = e.what(); | |
446 | } | |
447 | logger().error("ShardedServerSocket({})::accept(): " | |
448 | "fn_accept(s, {}) got unexpected exception {}", | |
449 | ss.listen_addr, peer_addr, e_what); | |
450 | ceph_abort(); | |
451 | }); | |
452 | }); | |
453 | }); | |
454 | }).handle_exception_type([&ss](const std::system_error& e) { | |
455 | if (e.code() == std::errc::connection_aborted || | |
456 | e.code() == std::errc::invalid_argument) { | |
457 | logger().debug("ShardedServerSocket({})::accept(): stopped ({})", | |
458 | ss.listen_addr, e.what()); | |
459 | } else { | |
460 | throw; | |
461 | } | |
462 | }).handle_exception([&ss](auto eptr) { | |
463 | const char *e_what; | |
464 | try { | |
465 | std::rethrow_exception(eptr); | |
466 | } catch (std::exception &e) { | |
467 | e_what = e.what(); | |
468 | } | |
469 | logger().error("ShardedServerSocket({})::accept(): " | |
470 | "got unexpected exception {}", ss.listen_addr, e_what); | |
471 | ceph_abort(); | |
472 | }); | |
473 | }); | |
474 | }); | |
475 | } | |
476 | ||
477 | seastar::future<> | |
478 | ShardedServerSocket::shutdown_destroy() | |
479 | { | |
480 | assert(seastar::this_shard_id() == primary_sid); | |
481 | logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr); | |
482 | // shutdown shards | |
483 | return this->container().invoke_on_all([](auto& ss) { | |
f67539c2 TL |
484 | if (ss.listener) { |
485 | ss.listener->abort_accept(); | |
486 | } | |
487 | return ss.shutdown_gate.close(); | |
488 | }).then([this] { | |
aee94f69 TL |
489 | // destroy shards |
490 | return this->container().invoke_on_all([](auto& ss) { | |
491 | assert(ss.shutdown_gate.is_closed()); | |
492 | ss.listen_addr = entity_addr_t(); | |
493 | ss.listener.reset(); | |
494 | }); | |
495 | }).then([this] { | |
496 | // stop the sharded service: we should only construct/stop shards on #0 | |
497 | return this->container().invoke_on(0, [](auto& ss) { | |
f67539c2 TL |
498 | assert(ss.service); |
499 | return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); | |
500 | }); | |
501 | }); | |
502 | } | |
503 | ||
aee94f69 TL |
504 | seastar::future<ShardedServerSocket*> |
505 | ShardedServerSocket::create(bool dispatch_only_on_this_shard) | |
f67539c2 | 506 | { |
aee94f69 TL |
507 | auto primary_sid = seastar::this_shard_id(); |
508 | // start the sharded service: we should only construct/stop shards on #0 | |
509 | return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] { | |
f67539c2 | 510 | auto service = std::make_unique<sharded_service_t>(); |
aee94f69 TL |
511 | return service->start( |
512 | primary_sid, dispatch_only_on_this_shard, construct_tag{} | |
513 | ).then([service = std::move(service)]() mutable { | |
f67539c2 TL |
514 | auto p_shard = service.get(); |
515 | p_shard->local().service = std::move(service); | |
516 | return p_shard; | |
517 | }); | |
aee94f69 | 518 | }).then([](auto p_shard) { |
f67539c2 TL |
519 | return &p_shard->local(); |
520 | }); | |
521 | } | |
522 | ||
9f95a23c | 523 | } // namespace crimson::net |