]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "Protocol.h" | |
5 | ||
6 | #include "auth/Auth.h" | |
7 | ||
8 | #include "crimson/common/log.h" | |
9 | #include "crimson/net/Errors.h" | |
f67539c2 | 10 | #include "crimson/net/chained_dispatchers.h" |
9f95a23c TL |
11 | #include "crimson/net/Socket.h" |
12 | #include "crimson/net/SocketConnection.h" | |
13 | #include "msg/Message.h" | |
14 | ||
15 | namespace { | |
16 | seastar::logger& logger() { | |
17 | return crimson::get_logger(ceph_subsys_ms); | |
18 | } | |
19 | } | |
20 | ||
21 | namespace crimson::net { | |
22 | ||
23 | Protocol::Protocol(proto_t type, | |
f67539c2 | 24 | ChainedDispatchers& dispatchers, |
9f95a23c TL |
25 | SocketConnection& conn) |
26 | : proto_type(type), | |
f67539c2 | 27 | dispatchers(dispatchers), |
9f95a23c TL |
28 | conn(conn), |
29 | auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()} | |
30 | {} | |
31 | ||
32 | Protocol::~Protocol() | |
33 | { | |
f67539c2 | 34 | ceph_assert(gate.is_closed()); |
9f95a23c TL |
35 | assert(!exit_open); |
36 | } | |
37 | ||
f67539c2 TL |
38 | void Protocol::close(bool dispatch_reset, |
39 | std::optional<std::function<void()>> f_accept_new) | |
9f95a23c TL |
40 | { |
41 | if (closed) { | |
42 | // already closing | |
f67539c2 | 43 | return; |
9f95a23c TL |
44 | } |
45 | ||
f67539c2 TL |
46 | bool is_replace = f_accept_new ? true : false; |
47 | logger().info("{} closing: reset {}, replace {}", conn, | |
48 | dispatch_reset ? "yes" : "no", | |
49 | is_replace ? "yes" : "no"); | |
9f95a23c | 50 | |
f67539c2 TL |
51 | // atomic operations |
52 | closed = true; | |
9f95a23c | 53 | trigger_close(); |
f67539c2 TL |
54 | if (f_accept_new) { |
55 | (*f_accept_new)(); | |
56 | } | |
9f95a23c TL |
57 | if (socket) { |
58 | socket->shutdown(); | |
9f95a23c | 59 | } |
9f95a23c | 60 | set_write_state(write_state_t::drop); |
f67539c2 TL |
61 | assert(!gate.is_closed()); |
62 | auto gate_closed = gate.close(); | |
63 | ||
64 | if (dispatch_reset) { | |
65 | dispatchers.ms_handle_reset( | |
66 | seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()), | |
67 | is_replace); | |
68 | } | |
9f95a23c | 69 | |
f67539c2 TL |
70 | // asynchronous operations |
71 | assert(!close_ready.valid()); | |
72 | close_ready = std::move(gate_closed).then([this] { | |
73 | if (socket) { | |
74 | return socket->close(); | |
75 | } else { | |
76 | return seastar::now(); | |
77 | } | |
78 | }).then([this] { | |
79 | logger().debug("{} closed!", conn); | |
80 | on_closed(); | |
81 | #ifdef UNIT_TESTS_BUILT | |
82 | is_closed_clean = true; | |
83 | if (conn.interceptor) { | |
84 | conn.interceptor->register_conn_closed(conn); | |
85 | } | |
86 | #endif | |
87 | }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { | |
88 | logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr); | |
89 | ceph_abort(); | |
90 | }); | |
9f95a23c TL |
91 | } |
92 | ||
93 | seastar::future<> Protocol::send(MessageRef msg) | |
94 | { | |
95 | if (write_state != write_state_t::drop) { | |
96 | conn.out_q.push_back(std::move(msg)); | |
97 | write_event(); | |
98 | } | |
99 | return seastar::now(); | |
100 | } | |
101 | ||
102 | seastar::future<> Protocol::keepalive() | |
103 | { | |
104 | if (!need_keepalive) { | |
105 | need_keepalive = true; | |
106 | write_event(); | |
107 | } | |
108 | return seastar::now(); | |
109 | } | |
110 | ||
111 | void Protocol::notify_keepalive_ack(utime_t _keepalive_ack) | |
112 | { | |
113 | logger().trace("{} got keepalive ack {}", conn, _keepalive_ack); | |
114 | keepalive_ack = _keepalive_ack; | |
115 | write_event(); | |
116 | } | |
117 | ||
118 | void Protocol::notify_ack() | |
119 | { | |
120 | if (!conn.policy.lossy) { | |
121 | ++ack_left; | |
122 | write_event(); | |
123 | } | |
124 | } | |
125 | ||
126 | void Protocol::requeue_sent() | |
127 | { | |
128 | assert(write_state != write_state_t::open); | |
129 | if (conn.sent.empty()) { | |
130 | return; | |
131 | } | |
132 | ||
133 | conn.out_seq -= conn.sent.size(); | |
134 | logger().debug("{} requeue {} items, revert out_seq to {}", | |
135 | conn, conn.sent.size(), conn.out_seq); | |
136 | for (MessageRef& msg : conn.sent) { | |
137 | msg->clear_payload(); | |
138 | msg->set_seq(0); | |
139 | } | |
140 | conn.out_q.insert(conn.out_q.begin(), | |
141 | std::make_move_iterator(conn.sent.begin()), | |
142 | std::make_move_iterator(conn.sent.end())); | |
143 | conn.sent.clear(); | |
144 | write_event(); | |
145 | } | |
146 | ||
147 | void Protocol::requeue_up_to(seq_num_t seq) | |
148 | { | |
149 | assert(write_state != write_state_t::open); | |
150 | if (conn.sent.empty() && conn.out_q.empty()) { | |
151 | logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}", | |
152 | conn, conn.out_seq, seq); | |
153 | conn.out_seq = seq; | |
154 | return; | |
155 | } | |
156 | logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})", | |
157 | conn, seq, conn.sent.size(), conn.out_seq); | |
158 | while (!conn.sent.empty()) { | |
159 | auto cur_seq = conn.sent.front()->get_seq(); | |
160 | if (cur_seq == 0 || cur_seq > seq) { | |
161 | break; | |
162 | } else { | |
163 | conn.sent.pop_front(); | |
164 | } | |
165 | } | |
166 | requeue_sent(); | |
167 | } | |
168 | ||
169 | void Protocol::reset_write() | |
170 | { | |
171 | assert(write_state != write_state_t::open); | |
172 | conn.out_seq = 0; | |
173 | conn.out_q.clear(); | |
174 | conn.sent.clear(); | |
175 | need_keepalive = false; | |
176 | keepalive_ack = std::nullopt; | |
177 | ack_left = 0; | |
178 | } | |
179 | ||
180 | void Protocol::ack_writes(seq_num_t seq) | |
181 | { | |
182 | if (conn.policy.lossy) { // lossy connections don't keep sent messages | |
183 | return; | |
184 | } | |
185 | while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) { | |
186 | logger().trace("{} got ack seq {} >= {}, pop {}", | |
187 | conn, seq, conn.sent.front()->get_seq(), conn.sent.front()); | |
188 | conn.sent.pop_front(); | |
189 | } | |
190 | } | |
191 | ||
192 | seastar::future<stop_t> Protocol::try_exit_sweep() { | |
193 | assert(!is_queued()); | |
194 | return socket->flush().then([this] { | |
195 | if (!is_queued()) { | |
196 | // still nothing pending to send after flush, | |
197 | // the dispatching can ONLY stop now | |
198 | ceph_assert(write_dispatching); | |
199 | write_dispatching = false; | |
200 | if (unlikely(exit_open.has_value())) { | |
201 | exit_open->set_value(); | |
202 | exit_open = std::nullopt; | |
203 | logger().info("{} write_event: nothing queued at {}," | |
204 | " set exit_open", | |
205 | conn, get_state_name(write_state)); | |
206 | } | |
207 | return seastar::make_ready_future<stop_t>(stop_t::yes); | |
208 | } else { | |
209 | // something is pending to send during flushing | |
210 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
211 | } | |
212 | }); | |
213 | } | |
214 | ||
215 | seastar::future<> Protocol::do_write_dispatch_sweep() | |
216 | { | |
217 | return seastar::repeat([this] { | |
218 | switch (write_state) { | |
219 | case write_state_t::open: { | |
220 | size_t num_msgs = conn.out_q.size(); | |
221 | bool still_queued = is_queued(); | |
222 | if (unlikely(!still_queued)) { | |
223 | return try_exit_sweep(); | |
224 | } | |
225 | conn.pending_q.clear(); | |
226 | conn.pending_q.swap(conn.out_q); | |
227 | if (!conn.policy.lossy) { | |
228 | conn.sent.insert(conn.sent.end(), | |
229 | conn.pending_q.begin(), | |
230 | conn.pending_q.end()); | |
231 | } | |
232 | auto acked = ack_left; | |
233 | assert(acked == 0 || conn.in_seq > 0); | |
234 | // sweep all pending writes with the concrete Protocol | |
235 | return socket->write(do_sweep_messages( | |
236 | conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0) | |
237 | ).then([this, prv_keepalive_ack=keepalive_ack, acked] { | |
238 | need_keepalive = false; | |
239 | if (keepalive_ack == prv_keepalive_ack) { | |
240 | keepalive_ack = std::nullopt; | |
241 | } | |
242 | assert(ack_left >= acked); | |
243 | ack_left -= acked; | |
244 | if (!is_queued()) { | |
245 | return try_exit_sweep(); | |
246 | } else { | |
247 | // messages were enqueued during socket write | |
248 | return seastar::make_ready_future<stop_t>(stop_t::no); | |
249 | } | |
250 | }); | |
251 | } | |
252 | case write_state_t::delay: | |
253 | // delay dispatching writes until open | |
254 | if (exit_open) { | |
255 | exit_open->set_value(); | |
256 | exit_open = std::nullopt; | |
257 | logger().info("{} write_event: delay and set exit_open ...", conn); | |
258 | } else { | |
259 | logger().info("{} write_event: delay ...", conn); | |
260 | } | |
261 | return state_changed.get_shared_future() | |
262 | .then([] { return stop_t::no; }); | |
263 | case write_state_t::drop: | |
264 | ceph_assert(write_dispatching); | |
265 | write_dispatching = false; | |
266 | if (exit_open) { | |
267 | exit_open->set_value(); | |
268 | exit_open = std::nullopt; | |
269 | logger().info("{} write_event: dropped and set exit_open", conn); | |
270 | } else { | |
271 | logger().info("{} write_event: dropped", conn); | |
272 | } | |
273 | return seastar::make_ready_future<stop_t>(stop_t::yes); | |
274 | default: | |
275 | ceph_assert(false); | |
276 | } | |
277 | }).handle_exception_type([this] (const std::system_error& e) { | |
278 | if (e.code() != std::errc::broken_pipe && | |
279 | e.code() != std::errc::connection_reset && | |
280 | e.code() != error::negotiation_failure) { | |
281 | logger().error("{} write_event(): unexpected error at {} -- {}", | |
282 | conn, get_state_name(write_state), e); | |
283 | ceph_abort(); | |
284 | } | |
285 | socket->shutdown(); | |
286 | if (write_state == write_state_t::open) { | |
287 | logger().info("{} write_event(): fault at {}, going to delay -- {}", | |
288 | conn, get_state_name(write_state), e); | |
289 | write_state = write_state_t::delay; | |
290 | } else { | |
291 | logger().info("{} write_event(): fault at {} -- {}", | |
292 | conn, get_state_name(write_state), e); | |
293 | } | |
294 | return do_write_dispatch_sweep(); | |
295 | }); | |
296 | } | |
297 | ||
298 | void Protocol::write_event() | |
299 | { | |
300 | notify_write(); | |
301 | if (write_dispatching) { | |
302 | // already dispatching | |
303 | return; | |
304 | } | |
305 | write_dispatching = true; | |
306 | switch (write_state) { | |
307 | case write_state_t::open: | |
308 | [[fallthrough]]; | |
309 | case write_state_t::delay: | |
f67539c2 TL |
310 | assert(!gate.is_closed()); |
311 | gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] { | |
312 | return do_write_dispatch_sweep(); | |
9f95a23c TL |
313 | }); |
314 | return; | |
315 | case write_state_t::drop: | |
316 | write_dispatching = false; | |
317 | return; | |
318 | default: | |
319 | ceph_assert(false); | |
320 | } | |
321 | } | |
322 | ||
323 | } // namespace crimson::net |