]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/Protocol.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / net / Protocol.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Protocol.h"
5
6#include "auth/Auth.h"
7
8#include "crimson/common/log.h"
9#include "crimson/net/Errors.h"
f67539c2 10#include "crimson/net/chained_dispatchers.h"
9f95a23c
TL
11#include "crimson/net/Socket.h"
12#include "crimson/net/SocketConnection.h"
13#include "msg/Message.h"
14
15namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_ms);
18 }
19}
20
21namespace crimson::net {
22
23Protocol::Protocol(proto_t type,
f67539c2 24 ChainedDispatchers& dispatchers,
9f95a23c
TL
25 SocketConnection& conn)
26 : proto_type(type),
f67539c2 27 dispatchers(dispatchers),
9f95a23c
TL
28 conn(conn),
29 auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
30{}
31
32Protocol::~Protocol()
33{
f67539c2 34 ceph_assert(gate.is_closed());
9f95a23c
TL
35 assert(!exit_open);
36}
37
f67539c2
TL
38void Protocol::close(bool dispatch_reset,
39 std::optional<std::function<void()>> f_accept_new)
9f95a23c
TL
40{
41 if (closed) {
42 // already closing
f67539c2 43 return;
9f95a23c
TL
44 }
45
f67539c2
TL
46 bool is_replace = f_accept_new ? true : false;
47 logger().info("{} closing: reset {}, replace {}", conn,
48 dispatch_reset ? "yes" : "no",
49 is_replace ? "yes" : "no");
9f95a23c 50
f67539c2
TL
51 // atomic operations
52 closed = true;
9f95a23c 53 trigger_close();
f67539c2
TL
54 if (f_accept_new) {
55 (*f_accept_new)();
56 }
9f95a23c
TL
57 if (socket) {
58 socket->shutdown();
9f95a23c 59 }
9f95a23c 60 set_write_state(write_state_t::drop);
f67539c2
TL
61 assert(!gate.is_closed());
62 auto gate_closed = gate.close();
63
64 if (dispatch_reset) {
65 dispatchers.ms_handle_reset(
66 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
67 is_replace);
68 }
9f95a23c 69
f67539c2
TL
70 // asynchronous operations
71 assert(!close_ready.valid());
72 close_ready = std::move(gate_closed).then([this] {
73 if (socket) {
74 return socket->close();
75 } else {
76 return seastar::now();
77 }
78 }).then([this] {
79 logger().debug("{} closed!", conn);
80 on_closed();
81#ifdef UNIT_TESTS_BUILT
82 is_closed_clean = true;
83 if (conn.interceptor) {
84 conn.interceptor->register_conn_closed(conn);
85 }
86#endif
87 }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
88 logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
89 ceph_abort();
90 });
9f95a23c
TL
91}
92
93seastar::future<> Protocol::send(MessageRef msg)
94{
95 if (write_state != write_state_t::drop) {
96 conn.out_q.push_back(std::move(msg));
97 write_event();
98 }
99 return seastar::now();
100}
101
102seastar::future<> Protocol::keepalive()
103{
104 if (!need_keepalive) {
105 need_keepalive = true;
106 write_event();
107 }
108 return seastar::now();
109}
110
111void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
112{
113 logger().trace("{} got keepalive ack {}", conn, _keepalive_ack);
114 keepalive_ack = _keepalive_ack;
115 write_event();
116}
117
118void Protocol::notify_ack()
119{
120 if (!conn.policy.lossy) {
121 ++ack_left;
122 write_event();
123 }
124}
125
126void Protocol::requeue_sent()
127{
128 assert(write_state != write_state_t::open);
129 if (conn.sent.empty()) {
130 return;
131 }
132
133 conn.out_seq -= conn.sent.size();
134 logger().debug("{} requeue {} items, revert out_seq to {}",
135 conn, conn.sent.size(), conn.out_seq);
136 for (MessageRef& msg : conn.sent) {
137 msg->clear_payload();
138 msg->set_seq(0);
139 }
140 conn.out_q.insert(conn.out_q.begin(),
141 std::make_move_iterator(conn.sent.begin()),
142 std::make_move_iterator(conn.sent.end()));
143 conn.sent.clear();
144 write_event();
145}
146
147void Protocol::requeue_up_to(seq_num_t seq)
148{
149 assert(write_state != write_state_t::open);
150 if (conn.sent.empty() && conn.out_q.empty()) {
151 logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
152 conn, conn.out_seq, seq);
153 conn.out_seq = seq;
154 return;
155 }
156 logger().debug("{} discarding sent items by seq {} (sent_len={}, out_seq={})",
157 conn, seq, conn.sent.size(), conn.out_seq);
158 while (!conn.sent.empty()) {
159 auto cur_seq = conn.sent.front()->get_seq();
160 if (cur_seq == 0 || cur_seq > seq) {
161 break;
162 } else {
163 conn.sent.pop_front();
164 }
165 }
166 requeue_sent();
167}
168
169void Protocol::reset_write()
170{
171 assert(write_state != write_state_t::open);
172 conn.out_seq = 0;
173 conn.out_q.clear();
174 conn.sent.clear();
175 need_keepalive = false;
176 keepalive_ack = std::nullopt;
177 ack_left = 0;
178}
179
180void Protocol::ack_writes(seq_num_t seq)
181{
182 if (conn.policy.lossy) { // lossy connections don't keep sent messages
183 return;
184 }
185 while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
186 logger().trace("{} got ack seq {} >= {}, pop {}",
187 conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
188 conn.sent.pop_front();
189 }
190}
191
192seastar::future<stop_t> Protocol::try_exit_sweep() {
193 assert(!is_queued());
194 return socket->flush().then([this] {
195 if (!is_queued()) {
196 // still nothing pending to send after flush,
197 // the dispatching can ONLY stop now
198 ceph_assert(write_dispatching);
199 write_dispatching = false;
200 if (unlikely(exit_open.has_value())) {
201 exit_open->set_value();
202 exit_open = std::nullopt;
203 logger().info("{} write_event: nothing queued at {},"
204 " set exit_open",
205 conn, get_state_name(write_state));
206 }
207 return seastar::make_ready_future<stop_t>(stop_t::yes);
208 } else {
209 // something is pending to send during flushing
210 return seastar::make_ready_future<stop_t>(stop_t::no);
211 }
212 });
213}
214
215seastar::future<> Protocol::do_write_dispatch_sweep()
216{
217 return seastar::repeat([this] {
218 switch (write_state) {
219 case write_state_t::open: {
220 size_t num_msgs = conn.out_q.size();
221 bool still_queued = is_queued();
222 if (unlikely(!still_queued)) {
223 return try_exit_sweep();
224 }
225 conn.pending_q.clear();
226 conn.pending_q.swap(conn.out_q);
227 if (!conn.policy.lossy) {
228 conn.sent.insert(conn.sent.end(),
229 conn.pending_q.begin(),
230 conn.pending_q.end());
231 }
232 auto acked = ack_left;
233 assert(acked == 0 || conn.in_seq > 0);
234 // sweep all pending writes with the concrete Protocol
235 return socket->write(do_sweep_messages(
236 conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
237 ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
238 need_keepalive = false;
239 if (keepalive_ack == prv_keepalive_ack) {
240 keepalive_ack = std::nullopt;
241 }
242 assert(ack_left >= acked);
243 ack_left -= acked;
244 if (!is_queued()) {
245 return try_exit_sweep();
246 } else {
247 // messages were enqueued during socket write
248 return seastar::make_ready_future<stop_t>(stop_t::no);
249 }
250 });
251 }
252 case write_state_t::delay:
253 // delay dispatching writes until open
254 if (exit_open) {
255 exit_open->set_value();
256 exit_open = std::nullopt;
257 logger().info("{} write_event: delay and set exit_open ...", conn);
258 } else {
259 logger().info("{} write_event: delay ...", conn);
260 }
261 return state_changed.get_shared_future()
262 .then([] { return stop_t::no; });
263 case write_state_t::drop:
264 ceph_assert(write_dispatching);
265 write_dispatching = false;
266 if (exit_open) {
267 exit_open->set_value();
268 exit_open = std::nullopt;
269 logger().info("{} write_event: dropped and set exit_open", conn);
270 } else {
271 logger().info("{} write_event: dropped", conn);
272 }
273 return seastar::make_ready_future<stop_t>(stop_t::yes);
274 default:
275 ceph_assert(false);
276 }
277 }).handle_exception_type([this] (const std::system_error& e) {
278 if (e.code() != std::errc::broken_pipe &&
279 e.code() != std::errc::connection_reset &&
280 e.code() != error::negotiation_failure) {
281 logger().error("{} write_event(): unexpected error at {} -- {}",
282 conn, get_state_name(write_state), e);
283 ceph_abort();
284 }
285 socket->shutdown();
286 if (write_state == write_state_t::open) {
287 logger().info("{} write_event(): fault at {}, going to delay -- {}",
288 conn, get_state_name(write_state), e);
289 write_state = write_state_t::delay;
290 } else {
291 logger().info("{} write_event(): fault at {} -- {}",
292 conn, get_state_name(write_state), e);
293 }
294 return do_write_dispatch_sweep();
295 });
296}
297
298void Protocol::write_event()
299{
300 notify_write();
301 if (write_dispatching) {
302 // already dispatching
303 return;
304 }
305 write_dispatching = true;
306 switch (write_state) {
307 case write_state_t::open:
308 [[fallthrough]];
309 case write_state_t::delay:
f67539c2
TL
310 assert(!gate.is_closed());
311 gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
312 return do_write_dispatch_sweep();
9f95a23c
TL
313 });
314 return;
315 case write_state_t::drop:
316 write_dispatching = false;
317 return;
318 default:
319 ceph_assert(false);
320 }
321}
322
323} // namespace crimson::net