]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/Socket.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / Socket.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Socket.h"
5
f67539c2
TL
6#include <seastar/core/when_all.hh>
7
9f95a23c 8#include "crimson/common/log.h"
11fdf7f2
TL
9#include "Errors.h"
10
9f95a23c 11namespace crimson::net {
11fdf7f2
TL
12
13namespace {
14
9f95a23c
TL
15seastar::logger& logger() {
16 return crimson::get_logger(ceph_subsys_ms);
17}
18
11fdf7f2
TL
19// an input_stream consumer that reads buffer segments into a bufferlist up to
20// the given number of remaining bytes
21struct bufferlist_consumer {
22 bufferlist& bl;
23 size_t& remaining;
24
25 bufferlist_consumer(bufferlist& bl, size_t& remaining)
26 : bl(bl), remaining(remaining) {}
27
28 using tmp_buf = seastar::temporary_buffer<char>;
29 using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
30
31 // consume some or all of a buffer segment
32 seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
33 if (remaining >= data.size()) {
34 // consume the whole buffer
35 remaining -= data.size();
36 bl.append(buffer::create_foreign(std::move(data)));
37 if (remaining > 0) {
38 // return none to request more segments
39 return seastar::make_ready_future<consumption_result_type>(
40 seastar::continue_consuming{});
41 } else {
42 // return an empty buffer to singal that we're done
43 return seastar::make_ready_future<consumption_result_type>(
44 consumption_result_type::stop_consuming_type({}));
45 }
46 }
47 if (remaining > 0) {
48 // consume the front
49 bl.append(buffer::create_foreign(data.share(0, remaining)));
50 data.trim_front(remaining);
51 remaining = 0;
52 }
53 // give the rest back to signal that we're done
54 return seastar::make_ready_future<consumption_result_type>(
55 consumption_result_type::stop_consuming_type{std::move(data)});
56 };
57};
58
59} // anonymous namespace
60
61seastar::future<bufferlist> Socket::read(size_t bytes)
62{
9f95a23c
TL
63#ifdef UNIT_TESTS_BUILT
64 return try_trap_pre(next_trap_read).then([bytes, this] {
65#endif
66 if (bytes == 0) {
67 return seastar::make_ready_future<bufferlist>();
68 }
69 r.buffer.clear();
70 r.remaining = bytes;
71 return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] {
11fdf7f2
TL
72 if (r.remaining) { // throw on short reads
73 throw std::system_error(make_error_code(error::read_eof));
74 }
75 return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
76 });
9f95a23c
TL
77#ifdef UNIT_TESTS_BUILT
78 }).then([this] (auto buf) {
79 return try_trap_post(next_trap_read
80 ).then([buf = std::move(buf)] () mutable {
81 return std::move(buf);
82 });
83 });
84#endif
11fdf7f2
TL
85}
86
87seastar::future<seastar::temporary_buffer<char>>
88Socket::read_exactly(size_t bytes) {
9f95a23c
TL
89#ifdef UNIT_TESTS_BUILT
90 return try_trap_pre(next_trap_read).then([bytes, this] {
91#endif
92 if (bytes == 0) {
93 return seastar::make_ready_future<seastar::temporary_buffer<char>>();
94 }
20effc67
TL
95 return in.read_exactly(bytes).then([bytes](auto buf) {
96 if (buf.size() < bytes) {
11fdf7f2
TL
97 throw std::system_error(make_error_code(error::read_eof));
98 }
99 return seastar::make_ready_future<tmp_buf>(std::move(buf));
100 });
9f95a23c
TL
101#ifdef UNIT_TESTS_BUILT
102 }).then([this] (auto buf) {
103 return try_trap_post(next_trap_read
104 ).then([buf = std::move(buf)] () mutable {
105 return std::move(buf);
106 });
107 });
108#endif
109}
110
111void Socket::shutdown() {
112 socket.shutdown_input();
113 socket.shutdown_output();
114}
115
116static inline seastar::future<>
117close_and_handle_errors(seastar::output_stream<char>& out)
118{
119 return out.close().handle_exception_type([] (const std::system_error& e) {
120 if (e.code() != std::errc::broken_pipe &&
121 e.code() != std::errc::connection_reset) {
122 logger().error("Socket::close(): unexpected error {}", e);
123 ceph_abort();
124 }
125 // can happen when out is already shutdown, ignore
126 });
127}
128
129seastar::future<> Socket::close() {
130#ifndef NDEBUG
131 ceph_assert(!closed);
132 closed = true;
133#endif
134 return seastar::when_all_succeed(
135 in.close(),
136 close_and_handle_errors(out)
f67539c2
TL
137 ).then_unpack([] {
138 return seastar::make_ready_future<>();
139 }).handle_exception([] (auto eptr) {
9f95a23c
TL
140 logger().error("Socket::close(): unexpected exception {}", eptr);
141 ceph_abort();
142 });
143}
144
145#ifdef UNIT_TESTS_BUILT
146seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
147 auto action = trap;
148 trap = bp_action_t::CONTINUE;
149 switch (action) {
150 case bp_action_t::CONTINUE:
151 break;
152 case bp_action_t::FAULT:
153 logger().info("[Test] got FAULT");
154 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
155 case bp_action_t::BLOCK:
156 logger().info("[Test] got BLOCK");
157 return blocker->block();
158 case bp_action_t::STALL:
159 trap = action;
160 break;
161 default:
162 ceph_abort("unexpected action from trap");
163 }
f67539c2 164 return seastar::make_ready_future<>();
9f95a23c
TL
165}
166
167seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
168 auto action = trap;
169 trap = bp_action_t::CONTINUE;
170 switch (action) {
171 case bp_action_t::CONTINUE:
172 break;
173 case bp_action_t::STALL:
174 logger().info("[Test] got STALL and block");
175 shutdown();
176 return blocker->block();
177 default:
178 ceph_abort("unexpected action from trap");
179 }
f67539c2 180 return seastar::make_ready_future<>();
9f95a23c
TL
181}
182
183void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
184 blocker = blocker_;
185 if (type == bp_type_t::READ) {
186 ceph_assert(next_trap_read == bp_action_t::CONTINUE);
187 next_trap_read = action;
188 } else { // type == bp_type_t::WRITE
189 if (next_trap_write == bp_action_t::CONTINUE) {
190 next_trap_write = action;
191 } else if (next_trap_write == bp_action_t::FAULT) {
192 // do_sweep_messages() may combine multiple write events into one socket write
193 ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
194 } else {
195 ceph_abort();
196 }
197 }
11fdf7f2 198}
9f95a23c 199#endif
11fdf7f2 200
20effc67 201crimson::net::listen_ertr::future<>
f67539c2
TL
202FixedCPUServerSocket::listen(entity_addr_t addr)
203{
204 assert(seastar::this_shard_id() == cpu);
205 logger().trace("FixedCPUServerSocket::listen({})...", addr);
206 return container().invoke_on_all([addr] (auto& ss) {
207 ss.addr = addr;
208 seastar::socket_address s_addr(addr.in4_addr());
209 seastar::listen_options lo;
210 lo.reuse_address = true;
211 lo.set_fixed_cpu(ss.cpu);
212 ss.listener = seastar::listen(s_addr, lo);
213 }).then([] {
20effc67
TL
214 return listen_ertr::now();
215 }).handle_exception_type(
216 [addr] (const std::system_error& e) -> listen_ertr::future<> {
f67539c2
TL
217 if (e.code() == std::errc::address_in_use) {
218 logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
f67539c2 219 return crimson::ct_error::address_in_use::make();
20effc67
TL
220 } else if (e.code() == std::errc::address_not_available) {
221 logger().trace("FixedCPUServerSocket::listen({}): address not available",
222 addr);
223 return crimson::ct_error::address_not_available::make();
f67539c2 224 }
20effc67
TL
225 logger().error("FixedCPUServerSocket::listen({}): "
226 "got unexpeted error {}", addr, e);
227 ceph_abort();
f67539c2
TL
228 });
229}
230
231seastar::future<> FixedCPUServerSocket::shutdown()
232{
233 assert(seastar::this_shard_id() == cpu);
234 logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
235 return container().invoke_on_all([] (auto& ss) {
236 if (ss.listener) {
237 ss.listener->abort_accept();
238 }
239 return ss.shutdown_gate.close();
240 }).then([this] {
241 return reset();
242 });
243}
244
245seastar::future<> FixedCPUServerSocket::destroy()
246{
247 assert(seastar::this_shard_id() == cpu);
248 return shutdown().then([this] {
249 // we should only construct/stop shards on #0
250 return container().invoke_on(0, [] (auto& ss) {
251 assert(ss.service);
252 return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
253 });
254 });
255}
256
257seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create()
258{
259 auto cpu = seastar::this_shard_id();
260 // we should only construct/stop shards on #0
261 return seastar::smp::submit_to(0, [cpu] {
262 auto service = std::make_unique<sharded_service_t>();
263 return service->start(cpu, construct_tag{}
264 ).then([service = std::move(service)] () mutable {
265 auto p_shard = service.get();
266 p_shard->local().service = std::move(service);
267 return p_shard;
268 });
269 }).then([] (auto p_shard) {
270 return &p_shard->local();
271 });
272}
273
9f95a23c 274} // namespace crimson::net