]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "Socket.h" | |
5 | ||
f67539c2 TL |
6 | #include <seastar/core/when_all.hh> |
7 | ||
9f95a23c | 8 | #include "crimson/common/log.h" |
11fdf7f2 TL |
9 | #include "Errors.h" |
10 | ||
9f95a23c | 11 | namespace crimson::net { |
11fdf7f2 TL |
12 | |
13 | namespace { | |
14 | ||
9f95a23c TL |
15 | seastar::logger& logger() { |
16 | return crimson::get_logger(ceph_subsys_ms); | |
17 | } | |
18 | ||
11fdf7f2 TL |
19 | // an input_stream consumer that reads buffer segments into a bufferlist up to |
20 | // the given number of remaining bytes | |
21 | struct bufferlist_consumer { | |
22 | bufferlist& bl; | |
23 | size_t& remaining; | |
24 | ||
25 | bufferlist_consumer(bufferlist& bl, size_t& remaining) | |
26 | : bl(bl), remaining(remaining) {} | |
27 | ||
28 | using tmp_buf = seastar::temporary_buffer<char>; | |
29 | using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; | |
30 | ||
31 | // consume some or all of a buffer segment | |
32 | seastar::future<consumption_result_type> operator()(tmp_buf&& data) { | |
33 | if (remaining >= data.size()) { | |
34 | // consume the whole buffer | |
35 | remaining -= data.size(); | |
36 | bl.append(buffer::create_foreign(std::move(data))); | |
37 | if (remaining > 0) { | |
38 | // return none to request more segments | |
39 | return seastar::make_ready_future<consumption_result_type>( | |
40 | seastar::continue_consuming{}); | |
41 | } else { | |
42 | // return an empty buffer to singal that we're done | |
43 | return seastar::make_ready_future<consumption_result_type>( | |
44 | consumption_result_type::stop_consuming_type({})); | |
45 | } | |
46 | } | |
47 | if (remaining > 0) { | |
48 | // consume the front | |
49 | bl.append(buffer::create_foreign(data.share(0, remaining))); | |
50 | data.trim_front(remaining); | |
51 | remaining = 0; | |
52 | } | |
53 | // give the rest back to signal that we're done | |
54 | return seastar::make_ready_future<consumption_result_type>( | |
55 | consumption_result_type::stop_consuming_type{std::move(data)}); | |
56 | }; | |
57 | }; | |
58 | ||
59 | } // anonymous namespace | |
60 | ||
61 | seastar::future<bufferlist> Socket::read(size_t bytes) | |
62 | { | |
9f95a23c TL |
63 | #ifdef UNIT_TESTS_BUILT |
64 | return try_trap_pre(next_trap_read).then([bytes, this] { | |
65 | #endif | |
66 | if (bytes == 0) { | |
67 | return seastar::make_ready_future<bufferlist>(); | |
68 | } | |
69 | r.buffer.clear(); | |
70 | r.remaining = bytes; | |
71 | return in.consume(bufferlist_consumer{r.buffer, r.remaining}).then([this] { | |
11fdf7f2 TL |
72 | if (r.remaining) { // throw on short reads |
73 | throw std::system_error(make_error_code(error::read_eof)); | |
74 | } | |
75 | return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); | |
76 | }); | |
9f95a23c TL |
77 | #ifdef UNIT_TESTS_BUILT |
78 | }).then([this] (auto buf) { | |
79 | return try_trap_post(next_trap_read | |
80 | ).then([buf = std::move(buf)] () mutable { | |
81 | return std::move(buf); | |
82 | }); | |
83 | }); | |
84 | #endif | |
11fdf7f2 TL |
85 | } |
86 | ||
87 | seastar::future<seastar::temporary_buffer<char>> | |
88 | Socket::read_exactly(size_t bytes) { | |
9f95a23c TL |
89 | #ifdef UNIT_TESTS_BUILT |
90 | return try_trap_pre(next_trap_read).then([bytes, this] { | |
91 | #endif | |
92 | if (bytes == 0) { | |
93 | return seastar::make_ready_future<seastar::temporary_buffer<char>>(); | |
94 | } | |
20effc67 TL |
95 | return in.read_exactly(bytes).then([bytes](auto buf) { |
96 | if (buf.size() < bytes) { | |
11fdf7f2 TL |
97 | throw std::system_error(make_error_code(error::read_eof)); |
98 | } | |
99 | return seastar::make_ready_future<tmp_buf>(std::move(buf)); | |
100 | }); | |
9f95a23c TL |
101 | #ifdef UNIT_TESTS_BUILT |
102 | }).then([this] (auto buf) { | |
103 | return try_trap_post(next_trap_read | |
104 | ).then([buf = std::move(buf)] () mutable { | |
105 | return std::move(buf); | |
106 | }); | |
107 | }); | |
108 | #endif | |
109 | } | |
110 | ||
111 | void Socket::shutdown() { | |
112 | socket.shutdown_input(); | |
113 | socket.shutdown_output(); | |
114 | } | |
115 | ||
116 | static inline seastar::future<> | |
117 | close_and_handle_errors(seastar::output_stream<char>& out) | |
118 | { | |
119 | return out.close().handle_exception_type([] (const std::system_error& e) { | |
120 | if (e.code() != std::errc::broken_pipe && | |
121 | e.code() != std::errc::connection_reset) { | |
122 | logger().error("Socket::close(): unexpected error {}", e); | |
123 | ceph_abort(); | |
124 | } | |
125 | // can happen when out is already shutdown, ignore | |
126 | }); | |
127 | } | |
128 | ||
129 | seastar::future<> Socket::close() { | |
130 | #ifndef NDEBUG | |
131 | ceph_assert(!closed); | |
132 | closed = true; | |
133 | #endif | |
134 | return seastar::when_all_succeed( | |
135 | in.close(), | |
136 | close_and_handle_errors(out) | |
f67539c2 TL |
137 | ).then_unpack([] { |
138 | return seastar::make_ready_future<>(); | |
139 | }).handle_exception([] (auto eptr) { | |
9f95a23c TL |
140 | logger().error("Socket::close(): unexpected exception {}", eptr); |
141 | ceph_abort(); | |
142 | }); | |
143 | } | |
144 | ||
145 | #ifdef UNIT_TESTS_BUILT | |
146 | seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { | |
147 | auto action = trap; | |
148 | trap = bp_action_t::CONTINUE; | |
149 | switch (action) { | |
150 | case bp_action_t::CONTINUE: | |
151 | break; | |
152 | case bp_action_t::FAULT: | |
153 | logger().info("[Test] got FAULT"); | |
154 | throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); | |
155 | case bp_action_t::BLOCK: | |
156 | logger().info("[Test] got BLOCK"); | |
157 | return blocker->block(); | |
158 | case bp_action_t::STALL: | |
159 | trap = action; | |
160 | break; | |
161 | default: | |
162 | ceph_abort("unexpected action from trap"); | |
163 | } | |
f67539c2 | 164 | return seastar::make_ready_future<>(); |
9f95a23c TL |
165 | } |
166 | ||
167 | seastar::future<> Socket::try_trap_post(bp_action_t& trap) { | |
168 | auto action = trap; | |
169 | trap = bp_action_t::CONTINUE; | |
170 | switch (action) { | |
171 | case bp_action_t::CONTINUE: | |
172 | break; | |
173 | case bp_action_t::STALL: | |
174 | logger().info("[Test] got STALL and block"); | |
175 | shutdown(); | |
176 | return blocker->block(); | |
177 | default: | |
178 | ceph_abort("unexpected action from trap"); | |
179 | } | |
f67539c2 | 180 | return seastar::make_ready_future<>(); |
9f95a23c TL |
181 | } |
182 | ||
183 | void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { | |
184 | blocker = blocker_; | |
185 | if (type == bp_type_t::READ) { | |
186 | ceph_assert(next_trap_read == bp_action_t::CONTINUE); | |
187 | next_trap_read = action; | |
188 | } else { // type == bp_type_t::WRITE | |
189 | if (next_trap_write == bp_action_t::CONTINUE) { | |
190 | next_trap_write = action; | |
191 | } else if (next_trap_write == bp_action_t::FAULT) { | |
192 | // do_sweep_messages() may combine multiple write events into one socket write | |
193 | ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); | |
194 | } else { | |
195 | ceph_abort(); | |
196 | } | |
197 | } | |
11fdf7f2 | 198 | } |
9f95a23c | 199 | #endif |
11fdf7f2 | 200 | |
20effc67 | 201 | crimson::net::listen_ertr::future<> |
f67539c2 TL |
202 | FixedCPUServerSocket::listen(entity_addr_t addr) |
203 | { | |
204 | assert(seastar::this_shard_id() == cpu); | |
205 | logger().trace("FixedCPUServerSocket::listen({})...", addr); | |
206 | return container().invoke_on_all([addr] (auto& ss) { | |
207 | ss.addr = addr; | |
208 | seastar::socket_address s_addr(addr.in4_addr()); | |
209 | seastar::listen_options lo; | |
210 | lo.reuse_address = true; | |
211 | lo.set_fixed_cpu(ss.cpu); | |
212 | ss.listener = seastar::listen(s_addr, lo); | |
213 | }).then([] { | |
20effc67 TL |
214 | return listen_ertr::now(); |
215 | }).handle_exception_type( | |
216 | [addr] (const std::system_error& e) -> listen_ertr::future<> { | |
f67539c2 TL |
217 | if (e.code() == std::errc::address_in_use) { |
218 | logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); | |
f67539c2 | 219 | return crimson::ct_error::address_in_use::make(); |
20effc67 TL |
220 | } else if (e.code() == std::errc::address_not_available) { |
221 | logger().trace("FixedCPUServerSocket::listen({}): address not available", | |
222 | addr); | |
223 | return crimson::ct_error::address_not_available::make(); | |
f67539c2 | 224 | } |
20effc67 TL |
225 | logger().error("FixedCPUServerSocket::listen({}): " |
226 | "got unexpeted error {}", addr, e); | |
227 | ceph_abort(); | |
f67539c2 TL |
228 | }); |
229 | } | |
230 | ||
231 | seastar::future<> FixedCPUServerSocket::shutdown() | |
232 | { | |
233 | assert(seastar::this_shard_id() == cpu); | |
234 | logger().trace("FixedCPUServerSocket({})::shutdown()...", addr); | |
235 | return container().invoke_on_all([] (auto& ss) { | |
236 | if (ss.listener) { | |
237 | ss.listener->abort_accept(); | |
238 | } | |
239 | return ss.shutdown_gate.close(); | |
240 | }).then([this] { | |
241 | return reset(); | |
242 | }); | |
243 | } | |
244 | ||
245 | seastar::future<> FixedCPUServerSocket::destroy() | |
246 | { | |
247 | assert(seastar::this_shard_id() == cpu); | |
248 | return shutdown().then([this] { | |
249 | // we should only construct/stop shards on #0 | |
250 | return container().invoke_on(0, [] (auto& ss) { | |
251 | assert(ss.service); | |
252 | return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); | |
253 | }); | |
254 | }); | |
255 | } | |
256 | ||
257 | seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create() | |
258 | { | |
259 | auto cpu = seastar::this_shard_id(); | |
260 | // we should only construct/stop shards on #0 | |
261 | return seastar::smp::submit_to(0, [cpu] { | |
262 | auto service = std::make_unique<sharded_service_t>(); | |
263 | return service->start(cpu, construct_tag{} | |
264 | ).then([service = std::move(service)] () mutable { | |
265 | auto p_shard = service.get(); | |
266 | p_shard->local().service = std::move(service); | |
267 | return p_shard; | |
268 | }); | |
269 | }).then([] (auto p_shard) { | |
270 | return &p_shard->local(); | |
271 | }); | |
272 | } | |
273 | ||
9f95a23c | 274 | } // namespace crimson::net |