]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/Socket.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / Socket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Socket.h"
5
6 #include <seastar/core/sleep.hh>
7 #include <seastar/core/when_all.hh>
8 #include <seastar/net/packet.hh>
9
10 #include "crimson/common/log.h"
11 #include "Errors.h"
12
13 using crimson::common::local_conf;
14
15 namespace crimson::net {
16
17 namespace {
18
19 seastar::logger& logger() {
20 return crimson::get_logger(ceph_subsys_ms);
21 }
22
23 using tmp_buf = seastar::temporary_buffer<char>;
24 using packet = seastar::net::packet;
25
26 // an input_stream consumer that reads buffer segments into a bufferlist up to
27 // the given number of remaining bytes
28 struct bufferlist_consumer {
29 bufferlist& bl;
30 size_t& remaining;
31
32 bufferlist_consumer(bufferlist& bl, size_t& remaining)
33 : bl(bl), remaining(remaining) {}
34
35 using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
36
37 // consume some or all of a buffer segment
38 seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
39 if (remaining >= data.size()) {
40 // consume the whole buffer
41 remaining -= data.size();
42 bl.append(buffer::create(std::move(data)));
43 if (remaining > 0) {
44 // return none to request more segments
45 return seastar::make_ready_future<consumption_result_type>(
46 seastar::continue_consuming{});
47 } else {
48 // return an empty buffer to singal that we're done
49 return seastar::make_ready_future<consumption_result_type>(
50 consumption_result_type::stop_consuming_type({}));
51 }
52 }
53 if (remaining > 0) {
54 // consume the front
55 bl.append(buffer::create(data.share(0, remaining)));
56 data.trim_front(remaining);
57 remaining = 0;
58 }
59 // give the rest back to signal that we're done
60 return seastar::make_ready_future<consumption_result_type>(
61 consumption_result_type::stop_consuming_type{std::move(data)});
62 };
63 };
64
65 seastar::future<> inject_delay()
66 {
67 if (float delay_period = local_conf()->ms_inject_internal_delays;
68 delay_period) {
69 logger().debug("Socket::inject_delay: sleep for {}", delay_period);
70 return seastar::sleep(
71 std::chrono::milliseconds((int)(delay_period * 1000.0)));
72 }
73 return seastar::now();
74 }
75
76 void inject_failure()
77 {
78 if (local_conf()->ms_inject_socket_failures) {
79 uint64_t rand =
80 ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
81 if (rand % local_conf()->ms_inject_socket_failures == 0) {
82 logger().warn("Socket::inject_failure: injecting socket failure");
83 throw std::system_error(make_error_code(
84 error::negotiation_failure));
85 }
86 }
87 }
88
89 } // anonymous namespace
90
91 Socket::Socket(
92 seastar::connected_socket &&_socket,
93 side_t _side,
94 uint16_t e_port,
95 construct_tag)
96 : sid{seastar::this_shard_id()},
97 socket(std::move(_socket)),
98 in(socket.input()),
99 // the default buffer size 8192 is too small that may impact our write
100 // performance. see seastar::net::connected_socket::output()
101 out(socket.output(65536)),
102 socket_is_shutdown(false),
103 side(_side),
104 ephemeral_port(e_port)
105 {
106 if (local_conf()->ms_tcp_nodelay) {
107 socket.set_nodelay(true);
108 }
109 }
110
111 Socket::~Socket()
112 {
113 assert(seastar::this_shard_id() == sid);
114 #ifndef NDEBUG
115 assert(closed);
116 #endif
117 }
118
119 seastar::future<bufferlist>
120 Socket::read(size_t bytes)
121 {
122 assert(seastar::this_shard_id() == sid);
123 #ifdef UNIT_TESTS_BUILT
124 return try_trap_pre(next_trap_read).then([bytes, this] {
125 #endif
126 if (bytes == 0) {
127 return seastar::make_ready_future<bufferlist>();
128 }
129 r.buffer.clear();
130 r.remaining = bytes;
131 return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
132 if (r.remaining) { // throw on short reads
133 throw std::system_error(make_error_code(error::read_eof));
134 }
135 inject_failure();
136 return inject_delay().then([this] {
137 return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
138 });
139 });
140 #ifdef UNIT_TESTS_BUILT
141 }).then([this](auto buf) {
142 return try_trap_post(next_trap_read
143 ).then([buf = std::move(buf)]() mutable {
144 return std::move(buf);
145 });
146 });
147 #endif
148 }
149
150 seastar::future<bufferptr>
151 Socket::read_exactly(size_t bytes) {
152 assert(seastar::this_shard_id() == sid);
153 #ifdef UNIT_TESTS_BUILT
154 return try_trap_pre(next_trap_read).then([bytes, this] {
155 #endif
156 if (bytes == 0) {
157 return seastar::make_ready_future<bufferptr>();
158 }
159 return in.read_exactly(bytes).then([bytes](auto buf) {
160 bufferptr ptr(buffer::create(buf.share()));
161 if (ptr.length() < bytes) {
162 throw std::system_error(make_error_code(error::read_eof));
163 }
164 inject_failure();
165 return inject_delay(
166 ).then([ptr = std::move(ptr)]() mutable {
167 return seastar::make_ready_future<bufferptr>(std::move(ptr));
168 });
169 });
170 #ifdef UNIT_TESTS_BUILT
171 }).then([this](auto ptr) {
172 return try_trap_post(next_trap_read
173 ).then([ptr = std::move(ptr)]() mutable {
174 return std::move(ptr);
175 });
176 });
177 #endif
178 }
179
180 seastar::future<>
181 Socket::write(bufferlist buf)
182 {
183 assert(seastar::this_shard_id() == sid);
184 #ifdef UNIT_TESTS_BUILT
185 return try_trap_pre(next_trap_write
186 ).then([buf = std::move(buf), this]() mutable {
187 #endif
188 inject_failure();
189 return inject_delay(
190 ).then([buf = std::move(buf), this]() mutable {
191 packet p(std::move(buf));
192 return out.write(std::move(p));
193 });
194 #ifdef UNIT_TESTS_BUILT
195 }).then([this] {
196 return try_trap_post(next_trap_write);
197 });
198 #endif
199 }
200
201 seastar::future<>
202 Socket::flush()
203 {
204 assert(seastar::this_shard_id() == sid);
205 inject_failure();
206 return inject_delay().then([this] {
207 return out.flush();
208 });
209 }
210
211 seastar::future<>
212 Socket::write_flush(bufferlist buf)
213 {
214 assert(seastar::this_shard_id() == sid);
215 #ifdef UNIT_TESTS_BUILT
216 return try_trap_pre(next_trap_write
217 ).then([buf = std::move(buf), this]() mutable {
218 #endif
219 inject_failure();
220 return inject_delay(
221 ).then([buf = std::move(buf), this]() mutable {
222 packet p(std::move(buf));
223 return out.write(std::move(p)
224 ).then([this] {
225 return out.flush();
226 });
227 });
228 #ifdef UNIT_TESTS_BUILT
229 }).then([this] {
230 return try_trap_post(next_trap_write);
231 });
232 #endif
233 }
234
235 void Socket::shutdown()
236 {
237 assert(seastar::this_shard_id() == sid);
238 socket_is_shutdown = true;
239 socket.shutdown_input();
240 socket.shutdown_output();
241 }
242
243 static inline seastar::future<>
244 close_and_handle_errors(seastar::output_stream<char>& out)
245 {
246 return out.close().handle_exception_type([](const std::system_error& e) {
247 if (e.code() != std::errc::broken_pipe &&
248 e.code() != std::errc::connection_reset) {
249 logger().error("Socket::close(): unexpected error {}", e.what());
250 ceph_abort();
251 }
252 // can happen when out is already shutdown, ignore
253 });
254 }
255
256 seastar::future<>
257 Socket::close()
258 {
259 assert(seastar::this_shard_id() == sid);
260 #ifndef NDEBUG
261 ceph_assert_always(!closed);
262 closed = true;
263 #endif
264 return seastar::when_all_succeed(
265 inject_delay(),
266 in.close(),
267 close_and_handle_errors(out)
268 ).then_unpack([] {
269 return seastar::make_ready_future<>();
270 }).handle_exception([](auto eptr) {
271 const char *e_what;
272 try {
273 std::rethrow_exception(eptr);
274 } catch (std::exception &e) {
275 e_what = e.what();
276 }
277 logger().error("Socket::close(): unexpected exception {}", e_what);
278 ceph_abort();
279 });
280 }
281
282 seastar::future<SocketRef>
283 Socket::connect(const entity_addr_t &peer_addr)
284 {
285 inject_failure();
286 return inject_delay(
287 ).then([peer_addr] {
288 return seastar::connect(peer_addr.in4_addr());
289 }).then([peer_addr](seastar::connected_socket socket) {
290 auto ret = std::make_unique<Socket>(
291 std::move(socket), side_t::connector, 0, construct_tag{});
292 logger().debug("Socket::connect(): connected to {}, socket {}",
293 peer_addr, fmt::ptr(ret));
294 return ret;
295 });
296 }
297
298 #ifdef UNIT_TESTS_BUILT
299 void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
300 assert(seastar::this_shard_id() == sid);
301 blocker = blocker_;
302 if (type == bp_type_t::READ) {
303 ceph_assert_always(next_trap_read == bp_action_t::CONTINUE);
304 next_trap_read = action;
305 } else { // type == bp_type_t::WRITE
306 if (next_trap_write == bp_action_t::CONTINUE) {
307 next_trap_write = action;
308 } else if (next_trap_write == bp_action_t::FAULT) {
309 // do_sweep_messages() may combine multiple write events into one socket write
310 ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
311 } else {
312 ceph_abort();
313 }
314 }
315 }
316
317 seastar::future<>
318 Socket::try_trap_pre(bp_action_t& trap) {
319 auto action = trap;
320 trap = bp_action_t::CONTINUE;
321 switch (action) {
322 case bp_action_t::CONTINUE:
323 break;
324 case bp_action_t::FAULT:
325 logger().info("[Test] got FAULT");
326 throw std::system_error(make_error_code(error::negotiation_failure));
327 case bp_action_t::BLOCK:
328 logger().info("[Test] got BLOCK");
329 return blocker->block();
330 case bp_action_t::STALL:
331 trap = action;
332 break;
333 default:
334 ceph_abort("unexpected action from trap");
335 }
336 return seastar::make_ready_future<>();
337 }
338
339 seastar::future<>
340 Socket::try_trap_post(bp_action_t& trap) {
341 auto action = trap;
342 trap = bp_action_t::CONTINUE;
343 switch (action) {
344 case bp_action_t::CONTINUE:
345 break;
346 case bp_action_t::STALL:
347 logger().info("[Test] got STALL and block");
348 force_shutdown();
349 return blocker->block();
350 default:
351 ceph_abort("unexpected action from trap");
352 }
353 return seastar::make_ready_future<>();
354 }
355 #endif
356
357 ShardedServerSocket::ShardedServerSocket(
358 seastar::shard_id sid,
359 bool dispatch_only_on_primary_sid,
360 construct_tag)
361 : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid}
362 {
363 }
364
365 ShardedServerSocket::~ShardedServerSocket()
366 {
367 assert(!listener);
368 // detect whether user have called destroy() properly
369 ceph_assert_always(!service);
370 }
371
372 listen_ertr::future<>
373 ShardedServerSocket::listen(entity_addr_t addr)
374 {
375 ceph_assert_always(seastar::this_shard_id() == primary_sid);
376 logger().debug("ShardedServerSocket({})::listen()...", addr);
377 return this->container().invoke_on_all([addr](auto& ss) {
378 ss.listen_addr = addr;
379 seastar::socket_address s_addr(addr.in4_addr());
380 seastar::listen_options lo;
381 lo.reuse_address = true;
382 if (ss.dispatch_only_on_primary_sid) {
383 lo.set_fixed_cpu(ss.primary_sid);
384 }
385 ss.listener = seastar::listen(s_addr, lo);
386 }).then([] {
387 return listen_ertr::now();
388 }).handle_exception_type(
389 [addr](const std::system_error& e) -> listen_ertr::future<> {
390 if (e.code() == std::errc::address_in_use) {
391 logger().debug("ShardedServerSocket({})::listen(): address in use", addr);
392 return crimson::ct_error::address_in_use::make();
393 } else if (e.code() == std::errc::address_not_available) {
394 logger().debug("ShardedServerSocket({})::listen(): address not available",
395 addr);
396 return crimson::ct_error::address_not_available::make();
397 }
398 logger().error("ShardedServerSocket({})::listen(): "
399 "got unexpeted error {}", addr, e.what());
400 ceph_abort();
401 });
402 }
403
404 seastar::future<>
405 ShardedServerSocket::accept(accept_func_t &&_fn_accept)
406 {
407 ceph_assert_always(seastar::this_shard_id() == primary_sid);
408 logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
409 return this->container().invoke_on_all([_fn_accept](auto &ss) {
410 assert(ss.listener);
411 ss.fn_accept = _fn_accept;
412 // gate accepting
413 // ShardedServerSocket::shutdown() will drain the continuations in the gate
414 // so ignore the returned future
415 std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
416 return seastar::keep_doing([&ss] {
417 return ss.listener->accept(
418 ).then([&ss](seastar::accept_result accept_result) {
419 #ifndef NDEBUG
420 if (ss.dispatch_only_on_primary_sid) {
421 // see seastar::listen_options::set_fixed_cpu()
422 ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
423 }
424 #endif
425 auto [socket, paddr] = std::move(accept_result);
426 entity_addr_t peer_addr;
427 peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
428 peer_addr.set_type(ss.listen_addr.get_type());
429 SocketRef _socket = std::make_unique<Socket>(
430 std::move(socket), Socket::side_t::acceptor,
431 peer_addr.get_port(), Socket::construct_tag{});
432 logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, "
433 "socket {}, dispatch_only_on_primary_sid = {}",
434 ss.listen_addr, peer_addr, fmt::ptr(_socket),
435 ss.dispatch_only_on_primary_sid);
436 std::ignore = seastar::with_gate(
437 ss.shutdown_gate,
438 [socket=std::move(_socket), peer_addr, &ss]() mutable {
439 return ss.fn_accept(std::move(socket), peer_addr
440 ).handle_exception([&ss, peer_addr](auto eptr) {
441 const char *e_what;
442 try {
443 std::rethrow_exception(eptr);
444 } catch (std::exception &e) {
445 e_what = e.what();
446 }
447 logger().error("ShardedServerSocket({})::accept(): "
448 "fn_accept(s, {}) got unexpected exception {}",
449 ss.listen_addr, peer_addr, e_what);
450 ceph_abort();
451 });
452 });
453 });
454 }).handle_exception_type([&ss](const std::system_error& e) {
455 if (e.code() == std::errc::connection_aborted ||
456 e.code() == std::errc::invalid_argument) {
457 logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
458 ss.listen_addr, e.what());
459 } else {
460 throw;
461 }
462 }).handle_exception([&ss](auto eptr) {
463 const char *e_what;
464 try {
465 std::rethrow_exception(eptr);
466 } catch (std::exception &e) {
467 e_what = e.what();
468 }
469 logger().error("ShardedServerSocket({})::accept(): "
470 "got unexpected exception {}", ss.listen_addr, e_what);
471 ceph_abort();
472 });
473 });
474 });
475 }
476
477 seastar::future<>
478 ShardedServerSocket::shutdown_destroy()
479 {
480 assert(seastar::this_shard_id() == primary_sid);
481 logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
482 // shutdown shards
483 return this->container().invoke_on_all([](auto& ss) {
484 if (ss.listener) {
485 ss.listener->abort_accept();
486 }
487 return ss.shutdown_gate.close();
488 }).then([this] {
489 // destroy shards
490 return this->container().invoke_on_all([](auto& ss) {
491 assert(ss.shutdown_gate.is_closed());
492 ss.listen_addr = entity_addr_t();
493 ss.listener.reset();
494 });
495 }).then([this] {
496 // stop the sharded service: we should only construct/stop shards on #0
497 return this->container().invoke_on(0, [](auto& ss) {
498 assert(ss.service);
499 return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
500 });
501 });
502 }
503
504 seastar::future<ShardedServerSocket*>
505 ShardedServerSocket::create(bool dispatch_only_on_this_shard)
506 {
507 auto primary_sid = seastar::this_shard_id();
508 // start the sharded service: we should only construct/stop shards on #0
509 return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] {
510 auto service = std::make_unique<sharded_service_t>();
511 return service->start(
512 primary_sid, dispatch_only_on_this_shard, construct_tag{}
513 ).then([service = std::move(service)]() mutable {
514 auto p_shard = service.get();
515 p_shard->local().service = std::move(service);
516 return p_shard;
517 });
518 }).then([](auto p_shard) {
519 return &p_shard->local();
520 });
521 }
522
523 } // namespace crimson::net