]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/Socket.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / Socket.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Socket.h"
5
1e59de90 6#include <seastar/core/sleep.hh>
f67539c2 7#include <seastar/core/when_all.hh>
aee94f69 8#include <seastar/net/packet.hh>
f67539c2 9
9f95a23c 10#include "crimson/common/log.h"
11fdf7f2
TL
11#include "Errors.h"
12
1e59de90
TL
13using crimson::common::local_conf;
14
9f95a23c 15namespace crimson::net {
11fdf7f2
TL
16
17namespace {
18
9f95a23c
TL
19seastar::logger& logger() {
20 return crimson::get_logger(ceph_subsys_ms);
21}
22
aee94f69
TL
23using tmp_buf = seastar::temporary_buffer<char>;
24using packet = seastar::net::packet;
25
11fdf7f2
TL
26// an input_stream consumer that reads buffer segments into a bufferlist up to
27// the given number of remaining bytes
28struct bufferlist_consumer {
29 bufferlist& bl;
30 size_t& remaining;
31
32 bufferlist_consumer(bufferlist& bl, size_t& remaining)
33 : bl(bl), remaining(remaining) {}
34
11fdf7f2
TL
35 using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
36
37 // consume some or all of a buffer segment
38 seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
39 if (remaining >= data.size()) {
40 // consume the whole buffer
41 remaining -= data.size();
1e59de90 42 bl.append(buffer::create(std::move(data)));
11fdf7f2
TL
43 if (remaining > 0) {
44 // return none to request more segments
45 return seastar::make_ready_future<consumption_result_type>(
46 seastar::continue_consuming{});
47 } else {
48 // return an empty buffer to singal that we're done
49 return seastar::make_ready_future<consumption_result_type>(
50 consumption_result_type::stop_consuming_type({}));
51 }
52 }
53 if (remaining > 0) {
54 // consume the front
1e59de90 55 bl.append(buffer::create(data.share(0, remaining)));
11fdf7f2
TL
56 data.trim_front(remaining);
57 remaining = 0;
58 }
59 // give the rest back to signal that we're done
60 return seastar::make_ready_future<consumption_result_type>(
61 consumption_result_type::stop_consuming_type{std::move(data)});
62 };
63};
64
aee94f69
TL
65seastar::future<> inject_delay()
66{
67 if (float delay_period = local_conf()->ms_inject_internal_delays;
68 delay_period) {
69 logger().debug("Socket::inject_delay: sleep for {}", delay_period);
70 return seastar::sleep(
71 std::chrono::milliseconds((int)(delay_period * 1000.0)));
72 }
73 return seastar::now();
74}
75
76void inject_failure()
77{
78 if (local_conf()->ms_inject_socket_failures) {
79 uint64_t rand =
80 ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
81 if (rand % local_conf()->ms_inject_socket_failures == 0) {
82 logger().warn("Socket::inject_failure: injecting socket failure");
83 throw std::system_error(make_error_code(
84 error::negotiation_failure));
85 }
86 }
87}
88
11fdf7f2
TL
89} // anonymous namespace
90
aee94f69
TL
91Socket::Socket(
92 seastar::connected_socket &&_socket,
93 side_t _side,
94 uint16_t e_port,
95 construct_tag)
96 : sid{seastar::this_shard_id()},
97 socket(std::move(_socket)),
98 in(socket.input()),
99 // the default buffer size 8192 is too small that may impact our write
100 // performance. see seastar::net::connected_socket::output()
101 out(socket.output(65536)),
102 socket_is_shutdown(false),
103 side(_side),
104 ephemeral_port(e_port)
11fdf7f2 105{
aee94f69
TL
106 if (local_conf()->ms_tcp_nodelay) {
107 socket.set_nodelay(true);
108 }
109}
110
111Socket::~Socket()
112{
113 assert(seastar::this_shard_id() == sid);
114#ifndef NDEBUG
115 assert(closed);
116#endif
117}
118
119seastar::future<bufferlist>
120Socket::read(size_t bytes)
121{
122 assert(seastar::this_shard_id() == sid);
9f95a23c
TL
123#ifdef UNIT_TESTS_BUILT
124 return try_trap_pre(next_trap_read).then([bytes, this] {
125#endif
126 if (bytes == 0) {
127 return seastar::make_ready_future<bufferlist>();
128 }
129 r.buffer.clear();
130 r.remaining = bytes;
131 return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
11fdf7f2
TL
132 if (r.remaining) { // throw on short reads
133 throw std::system_error(make_error_code(error::read_eof));
134 }
1e59de90
TL
135 inject_failure();
136 return inject_delay().then([this] {
137 return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
138 });
11fdf7f2 139 });
9f95a23c 140#ifdef UNIT_TESTS_BUILT
aee94f69 141 }).then([this](auto buf) {
9f95a23c 142 return try_trap_post(next_trap_read
aee94f69 143 ).then([buf = std::move(buf)]() mutable {
9f95a23c
TL
144 return std::move(buf);
145 });
146 });
147#endif
11fdf7f2
TL
148}
149
aee94f69 150seastar::future<bufferptr>
11fdf7f2 151Socket::read_exactly(size_t bytes) {
aee94f69 152 assert(seastar::this_shard_id() == sid);
9f95a23c
TL
153#ifdef UNIT_TESTS_BUILT
154 return try_trap_pre(next_trap_read).then([bytes, this] {
155#endif
156 if (bytes == 0) {
aee94f69 157 return seastar::make_ready_future<bufferptr>();
9f95a23c 158 }
20effc67 159 return in.read_exactly(bytes).then([bytes](auto buf) {
aee94f69
TL
160 bufferptr ptr(buffer::create(buf.share()));
161 if (ptr.length() < bytes) {
11fdf7f2
TL
162 throw std::system_error(make_error_code(error::read_eof));
163 }
1e59de90
TL
164 inject_failure();
165 return inject_delay(
aee94f69
TL
166 ).then([ptr = std::move(ptr)]() mutable {
167 return seastar::make_ready_future<bufferptr>(std::move(ptr));
1e59de90 168 });
11fdf7f2 169 });
9f95a23c 170#ifdef UNIT_TESTS_BUILT
aee94f69 171 }).then([this](auto ptr) {
9f95a23c 172 return try_trap_post(next_trap_read
aee94f69
TL
173 ).then([ptr = std::move(ptr)]() mutable {
174 return std::move(ptr);
9f95a23c
TL
175 });
176 });
177#endif
178}
179
aee94f69
TL
180seastar::future<>
181Socket::write(bufferlist buf)
182{
183 assert(seastar::this_shard_id() == sid);
184#ifdef UNIT_TESTS_BUILT
185 return try_trap_pre(next_trap_write
186 ).then([buf = std::move(buf), this]() mutable {
187#endif
188 inject_failure();
189 return inject_delay(
190 ).then([buf = std::move(buf), this]() mutable {
191 packet p(std::move(buf));
192 return out.write(std::move(p));
193 });
194#ifdef UNIT_TESTS_BUILT
195 }).then([this] {
196 return try_trap_post(next_trap_write);
197 });
198#endif
199}
200
201seastar::future<>
202Socket::flush()
203{
204 assert(seastar::this_shard_id() == sid);
205 inject_failure();
206 return inject_delay().then([this] {
207 return out.flush();
208 });
209}
210
211seastar::future<>
212Socket::write_flush(bufferlist buf)
213{
214 assert(seastar::this_shard_id() == sid);
215#ifdef UNIT_TESTS_BUILT
216 return try_trap_pre(next_trap_write
217 ).then([buf = std::move(buf), this]() mutable {
218#endif
219 inject_failure();
220 return inject_delay(
221 ).then([buf = std::move(buf), this]() mutable {
222 packet p(std::move(buf));
223 return out.write(std::move(p)
224 ).then([this] {
225 return out.flush();
226 });
227 });
228#ifdef UNIT_TESTS_BUILT
229 }).then([this] {
230 return try_trap_post(next_trap_write);
231 });
232#endif
233}
234
235void Socket::shutdown()
236{
237 assert(seastar::this_shard_id() == sid);
1e59de90 238 socket_is_shutdown = true;
9f95a23c
TL
239 socket.shutdown_input();
240 socket.shutdown_output();
241}
242
243static inline seastar::future<>
244close_and_handle_errors(seastar::output_stream<char>& out)
245{
aee94f69 246 return out.close().handle_exception_type([](const std::system_error& e) {
9f95a23c
TL
247 if (e.code() != std::errc::broken_pipe &&
248 e.code() != std::errc::connection_reset) {
aee94f69 249 logger().error("Socket::close(): unexpected error {}", e.what());
9f95a23c
TL
250 ceph_abort();
251 }
252 // can happen when out is already shutdown, ignore
253 });
254}
255
aee94f69
TL
256seastar::future<>
257Socket::close()
258{
259 assert(seastar::this_shard_id() == sid);
9f95a23c 260#ifndef NDEBUG
aee94f69 261 ceph_assert_always(!closed);
9f95a23c
TL
262 closed = true;
263#endif
264 return seastar::when_all_succeed(
1e59de90 265 inject_delay(),
9f95a23c
TL
266 in.close(),
267 close_and_handle_errors(out)
f67539c2
TL
268 ).then_unpack([] {
269 return seastar::make_ready_future<>();
aee94f69
TL
270 }).handle_exception([](auto eptr) {
271 const char *e_what;
272 try {
273 std::rethrow_exception(eptr);
274 } catch (std::exception &e) {
275 e_what = e.what();
276 }
277 logger().error("Socket::close(): unexpected exception {}", e_what);
9f95a23c
TL
278 ceph_abort();
279 });
280}
281
aee94f69
TL
282seastar::future<SocketRef>
283Socket::connect(const entity_addr_t &peer_addr)
284{
285 inject_failure();
286 return inject_delay(
287 ).then([peer_addr] {
288 return seastar::connect(peer_addr.in4_addr());
289 }).then([peer_addr](seastar::connected_socket socket) {
290 auto ret = std::make_unique<Socket>(
291 std::move(socket), side_t::connector, 0, construct_tag{});
292 logger().debug("Socket::connect(): connected to {}, socket {}",
293 peer_addr, fmt::ptr(ret));
294 return ret;
295 });
1e59de90
TL
296}
297
aee94f69
TL
298#ifdef UNIT_TESTS_BUILT
299void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
300 assert(seastar::this_shard_id() == sid);
301 blocker = blocker_;
302 if (type == bp_type_t::READ) {
303 ceph_assert_always(next_trap_read == bp_action_t::CONTINUE);
304 next_trap_read = action;
305 } else { // type == bp_type_t::WRITE
306 if (next_trap_write == bp_action_t::CONTINUE) {
307 next_trap_write = action;
308 } else if (next_trap_write == bp_action_t::FAULT) {
309 // do_sweep_messages() may combine multiple write events into one socket write
310 ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
311 } else {
312 ceph_abort();
1e59de90
TL
313 }
314 }
315}
316
aee94f69
TL
317seastar::future<>
318Socket::try_trap_pre(bp_action_t& trap) {
9f95a23c
TL
319 auto action = trap;
320 trap = bp_action_t::CONTINUE;
321 switch (action) {
322 case bp_action_t::CONTINUE:
323 break;
324 case bp_action_t::FAULT:
325 logger().info("[Test] got FAULT");
aee94f69 326 throw std::system_error(make_error_code(error::negotiation_failure));
9f95a23c
TL
327 case bp_action_t::BLOCK:
328 logger().info("[Test] got BLOCK");
329 return blocker->block();
330 case bp_action_t::STALL:
331 trap = action;
332 break;
333 default:
334 ceph_abort("unexpected action from trap");
335 }
f67539c2 336 return seastar::make_ready_future<>();
9f95a23c
TL
337}
338
aee94f69
TL
339seastar::future<>
340Socket::try_trap_post(bp_action_t& trap) {
9f95a23c
TL
341 auto action = trap;
342 trap = bp_action_t::CONTINUE;
343 switch (action) {
344 case bp_action_t::CONTINUE:
345 break;
346 case bp_action_t::STALL:
347 logger().info("[Test] got STALL and block");
1e59de90 348 force_shutdown();
9f95a23c
TL
349 return blocker->block();
350 default:
351 ceph_abort("unexpected action from trap");
352 }
f67539c2 353 return seastar::make_ready_future<>();
9f95a23c 354}
aee94f69 355#endif
9f95a23c 356
aee94f69
TL
357ShardedServerSocket::ShardedServerSocket(
358 seastar::shard_id sid,
359 bool dispatch_only_on_primary_sid,
360 construct_tag)
361 : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid}
362{
363}
364
365ShardedServerSocket::~ShardedServerSocket()
366{
367 assert(!listener);
368 // detect whether user have called destroy() properly
369 ceph_assert_always(!service);
11fdf7f2
TL
370}
371
aee94f69
TL
372listen_ertr::future<>
373ShardedServerSocket::listen(entity_addr_t addr)
f67539c2 374{
aee94f69
TL
375 ceph_assert_always(seastar::this_shard_id() == primary_sid);
376 logger().debug("ShardedServerSocket({})::listen()...", addr);
377 return this->container().invoke_on_all([addr](auto& ss) {
378 ss.listen_addr = addr;
f67539c2
TL
379 seastar::socket_address s_addr(addr.in4_addr());
380 seastar::listen_options lo;
381 lo.reuse_address = true;
aee94f69
TL
382 if (ss.dispatch_only_on_primary_sid) {
383 lo.set_fixed_cpu(ss.primary_sid);
384 }
f67539c2
TL
385 ss.listener = seastar::listen(s_addr, lo);
386 }).then([] {
20effc67
TL
387 return listen_ertr::now();
388 }).handle_exception_type(
aee94f69 389 [addr](const std::system_error& e) -> listen_ertr::future<> {
f67539c2 390 if (e.code() == std::errc::address_in_use) {
aee94f69 391 logger().debug("ShardedServerSocket({})::listen(): address in use", addr);
f67539c2 392 return crimson::ct_error::address_in_use::make();
20effc67 393 } else if (e.code() == std::errc::address_not_available) {
aee94f69 394 logger().debug("ShardedServerSocket({})::listen(): address not available",
20effc67
TL
395 addr);
396 return crimson::ct_error::address_not_available::make();
f67539c2 397 }
aee94f69
TL
398 logger().error("ShardedServerSocket({})::listen(): "
399 "got unexpeted error {}", addr, e.what());
20effc67 400 ceph_abort();
f67539c2
TL
401 });
402}
403
aee94f69
TL
404seastar::future<>
405ShardedServerSocket::accept(accept_func_t &&_fn_accept)
f67539c2 406{
aee94f69
TL
407 ceph_assert_always(seastar::this_shard_id() == primary_sid);
408 logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
409 return this->container().invoke_on_all([_fn_accept](auto &ss) {
410 assert(ss.listener);
411 ss.fn_accept = _fn_accept;
412 // gate accepting
413 // ShardedServerSocket::shutdown() will drain the continuations in the gate
414 // so ignore the returned future
415 std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
416 return seastar::keep_doing([&ss] {
417 return ss.listener->accept(
418 ).then([&ss](seastar::accept_result accept_result) {
419#ifndef NDEBUG
420 if (ss.dispatch_only_on_primary_sid) {
421 // see seastar::listen_options::set_fixed_cpu()
422 ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
423 }
424#endif
425 auto [socket, paddr] = std::move(accept_result);
426 entity_addr_t peer_addr;
427 peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
428 peer_addr.set_type(ss.listen_addr.get_type());
429 SocketRef _socket = std::make_unique<Socket>(
430 std::move(socket), Socket::side_t::acceptor,
431 peer_addr.get_port(), Socket::construct_tag{});
432 logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, "
433 "socket {}, dispatch_only_on_primary_sid = {}",
434 ss.listen_addr, peer_addr, fmt::ptr(_socket),
435 ss.dispatch_only_on_primary_sid);
436 std::ignore = seastar::with_gate(
437 ss.shutdown_gate,
438 [socket=std::move(_socket), peer_addr, &ss]() mutable {
439 return ss.fn_accept(std::move(socket), peer_addr
440 ).handle_exception([&ss, peer_addr](auto eptr) {
441 const char *e_what;
442 try {
443 std::rethrow_exception(eptr);
444 } catch (std::exception &e) {
445 e_what = e.what();
446 }
447 logger().error("ShardedServerSocket({})::accept(): "
448 "fn_accept(s, {}) got unexpected exception {}",
449 ss.listen_addr, peer_addr, e_what);
450 ceph_abort();
451 });
452 });
453 });
454 }).handle_exception_type([&ss](const std::system_error& e) {
455 if (e.code() == std::errc::connection_aborted ||
456 e.code() == std::errc::invalid_argument) {
457 logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
458 ss.listen_addr, e.what());
459 } else {
460 throw;
461 }
462 }).handle_exception([&ss](auto eptr) {
463 const char *e_what;
464 try {
465 std::rethrow_exception(eptr);
466 } catch (std::exception &e) {
467 e_what = e.what();
468 }
469 logger().error("ShardedServerSocket({})::accept(): "
470 "got unexpected exception {}", ss.listen_addr, e_what);
471 ceph_abort();
472 });
473 });
474 });
475}
476
477seastar::future<>
478ShardedServerSocket::shutdown_destroy()
479{
480 assert(seastar::this_shard_id() == primary_sid);
481 logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
482 // shutdown shards
483 return this->container().invoke_on_all([](auto& ss) {
f67539c2
TL
484 if (ss.listener) {
485 ss.listener->abort_accept();
486 }
487 return ss.shutdown_gate.close();
488 }).then([this] {
aee94f69
TL
489 // destroy shards
490 return this->container().invoke_on_all([](auto& ss) {
491 assert(ss.shutdown_gate.is_closed());
492 ss.listen_addr = entity_addr_t();
493 ss.listener.reset();
494 });
495 }).then([this] {
496 // stop the sharded service: we should only construct/stop shards on #0
497 return this->container().invoke_on(0, [](auto& ss) {
f67539c2
TL
498 assert(ss.service);
499 return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
500 });
501 });
502}
503
aee94f69
TL
504seastar::future<ShardedServerSocket*>
505ShardedServerSocket::create(bool dispatch_only_on_this_shard)
f67539c2 506{
aee94f69
TL
507 auto primary_sid = seastar::this_shard_id();
508 // start the sharded service: we should only construct/stop shards on #0
509 return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] {
f67539c2 510 auto service = std::make_unique<sharded_service_t>();
aee94f69
TL
511 return service->start(
512 primary_sid, dispatch_only_on_this_shard, construct_tag{}
513 ).then([service = std::move(service)]() mutable {
f67539c2
TL
514 auto p_shard = service.get();
515 p_shard->local().service = std::move(service);
516 return p_shard;
517 });
aee94f69 518 }).then([](auto p_shard) {
f67539c2
TL
519 return &p_shard->local();
520 });
521}
522
9f95a23c 523} // namespace crimson::net