]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV1.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / crimson / net / ProtocolV1.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV1.h"
5
6 #include <seastar/core/shared_future.hh>
7 #include <seastar/core/sleep.hh>
8 #include <seastar/net/packet.hh>
9
10 #include "include/msgr.h"
11 #include "include/random.h"
12 #include "auth/Auth.h"
13 #include "auth/AuthSessionHandler.h"
14
15 #include "crimson/auth/AuthClient.h"
16 #include "crimson/auth/AuthServer.h"
17 #include "crimson/common/log.h"
18 #include "chained_dispatchers.h"
19 #include "Errors.h"
20 #include "Socket.h"
21 #include "SocketConnection.h"
22 #include "SocketMessenger.h"
23
24 WRITE_RAW_ENCODER(ceph_msg_connect);
25 WRITE_RAW_ENCODER(ceph_msg_connect_reply);
26
27 using crimson::common::local_conf;
28
29 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
30 {
31 return out << "connect{features=" << std::hex << c.features << std::dec
32 << " host_type=" << c.host_type
33 << " global_seq=" << c.global_seq
34 << " connect_seq=" << c.connect_seq
35 << " protocol_version=" << c.protocol_version
36 << " authorizer_protocol=" << c.authorizer_protocol
37 << " authorizer_len=" << c.authorizer_len
38 << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
39 }
40
41 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
42 {
43 return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
44 << " features=" << std::hex << r.features << std::dec
45 << " global_seq=" << r.global_seq
46 << " connect_seq=" << r.connect_seq
47 << " protocol_version=" << r.protocol_version
48 << " authorizer_len=" << r.authorizer_len
49 << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
50 }
51
52 namespace {
53
54 seastar::logger& logger() {
55 return crimson::get_logger(ceph_subsys_ms);
56 }
57
58 template <typename T>
59 seastar::net::packet make_static_packet(const T& value) {
60 return { reinterpret_cast<const char*>(&value), sizeof(value) };
61 }
62
63 // store the banner in a non-const string for buffer::create_static()
64 char banner[] = CEPH_BANNER;
65 constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
66
67 constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
68 constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
69
70 // check that the buffer starts with a valid banner without requiring it to
71 // be contiguous in memory
72 void validate_banner(bufferlist::const_iterator& p)
73 {
74 auto b = std::cbegin(banner);
75 auto end = b + banner_size;
76 while (b != end) {
77 const char *buf{nullptr};
78 auto remaining = std::distance(b, end);
79 auto len = p.get_ptr_and_advance(remaining, &buf);
80 if (!std::equal(buf, buf + len, b)) {
81 throw std::system_error(
82 make_error_code(crimson::net::error::bad_connect_banner));
83 }
84 b += len;
85 }
86 }
87
88 // return a static bufferptr to the given object
89 template <typename T>
90 bufferptr create_static(T& obj)
91 {
92 return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
93 }
94
95 uint32_t get_proto_version(entity_type_t peer_type, bool connect)
96 {
97 constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
98 // see also OSD.h, unlike other connection of simple/async messenger,
99 // crimson msgr is only used by osd
100 constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
101 if (peer_type == my_type) {
102 // internal
103 return CEPH_OSD_PROTOCOL;
104 } else {
105 // public
106 switch (connect ? peer_type : my_type) {
107 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
108 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
109 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
110 default: return 0;
111 }
112 }
113 }
114
115 void discard_up_to(std::deque<MessageRef>* queue,
116 crimson::net::seq_num_t seq)
117 {
118 while (!queue->empty() &&
119 queue->front()->get_seq() < seq) {
120 queue->pop_front();
121 }
122 }
123
124 } // namespace anonymous
125
126 namespace crimson::net {
127
128 ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers,
129 SocketConnection& conn,
130 SocketMessenger& messenger)
131 : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {}
132
133 ProtocolV1::~ProtocolV1() {}
134
135 bool ProtocolV1::is_connected() const
136 {
137 return state == state_t::open;
138 }
139
140 // connecting state
141
142 void ProtocolV1::reset_session()
143 {
144 conn.out_q = {};
145 conn.sent = {};
146 conn.in_seq = 0;
147 h.connect_seq = 0;
148 if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
149 // Set out_seq to a random value, so CRC won't be predictable.
150 // Constant to limit starting sequence number to 2^31. Nothing special
151 // about it, just a big number.
152 constexpr uint64_t SEQ_MASK = 0x7fffffff;
153 conn.out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
154 } else {
155 // previously, seq #'s always started at 0.
156 conn.out_seq = 0;
157 }
158 }
159
160 seastar::future<stop_t>
161 ProtocolV1::handle_connect_reply(msgr_tag_t tag)
162 {
163 if (h.auth_payload.length() && !conn.peer_is_mon()) {
164 if (tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { // more
165 h.auth_more = messenger.get_auth_client()->handle_auth_reply_more(
166 conn.shared_from_this(), auth_meta, h.auth_payload);
167 return seastar::make_ready_future<stop_t>(stop_t::no);
168 } else {
169 int ret = messenger.get_auth_client()->handle_auth_done(
170 conn.shared_from_this(), auth_meta, 0, 0, h.auth_payload);
171 if (ret < 0) {
172 // fault
173 logger().warn("{} AuthClient::handle_auth_done() return {}", conn, ret);
174 throw std::system_error(make_error_code(error::negotiation_failure));
175 }
176 }
177 }
178
179 switch (tag) {
180 case CEPH_MSGR_TAG_FEATURES:
181 logger().error("{} connect protocol feature mispatch", __func__);
182 throw std::system_error(make_error_code(error::negotiation_failure));
183 case CEPH_MSGR_TAG_BADPROTOVER:
184 logger().error("{} connect protocol version mispatch", __func__);
185 throw std::system_error(make_error_code(error::negotiation_failure));
186 case CEPH_MSGR_TAG_BADAUTHORIZER:
187 logger().error("{} got bad authorizer", __func__);
188 throw std::system_error(make_error_code(error::negotiation_failure));
189 case CEPH_MSGR_TAG_RESETSESSION:
190 reset_session();
191 return seastar::make_ready_future<stop_t>(stop_t::no);
192 case CEPH_MSGR_TAG_RETRY_GLOBAL:
193 return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) {
194 h.global_seq = gs;
195 return seastar::make_ready_future<stop_t>(stop_t::no);
196 });
197 case CEPH_MSGR_TAG_RETRY_SESSION:
198 ceph_assert(h.reply.connect_seq > h.connect_seq);
199 h.connect_seq = h.reply.connect_seq;
200 return seastar::make_ready_future<stop_t>(stop_t::no);
201 case CEPH_MSGR_TAG_WAIT:
202 // TODO: state wait
203 throw std::system_error(make_error_code(error::negotiation_failure));
204 case CEPH_MSGR_TAG_SEQ:
205 case CEPH_MSGR_TAG_READY:
206 if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features);
207 missing) {
208 logger().error("{} missing required features", __func__);
209 throw std::system_error(make_error_code(error::negotiation_failure));
210 }
211 return seastar::futurize_invoke([this, tag] {
212 if (tag == CEPH_MSGR_TAG_SEQ) {
213 return socket->read_exactly(sizeof(seq_num_t))
214 .then([this] (auto buf) {
215 auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
216 discard_up_to(&conn.out_q, *acked_seq);
217 return socket->write_flush(make_static_packet(conn.in_seq));
218 });
219 }
220 // tag CEPH_MSGR_TAG_READY
221 return seastar::now();
222 }).then([this] {
223 // hooray!
224 h.peer_global_seq = h.reply.global_seq;
225 conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
226 h.connect_seq++;
227 h.backoff = 0ms;
228 conn.set_features(h.reply.features & h.connect.features);
229 if (auth_meta->authorizer) {
230 session_security.reset(
231 get_auth_session_handler(nullptr,
232 auth_meta->authorizer->protocol,
233 auth_meta->session_key,
234 conn.features));
235 } else {
236 session_security.reset();
237 }
238 return seastar::make_ready_future<stop_t>(stop_t::yes);
239 });
240 break;
241 default:
242 // unknown tag
243 logger().error("{} got unknown tag", __func__, int(tag));
244 throw std::system_error(make_error_code(error::negotiation_failure));
245 }
246 }
247
248 ceph::bufferlist ProtocolV1::get_auth_payload()
249 {
250 // only non-mons connectings to mons use MAuth messages
251 if (conn.peer_is_mon() &&
252 messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) {
253 return {};
254 } else {
255 if (h.auth_more.length()) {
256 logger().info("using augmented (challenge) auth payload");
257 return std::move(h.auth_more);
258 } else {
259 auto [auth_method, preferred_modes, auth_bl] =
260 messenger.get_auth_client()->get_auth_request(
261 conn.shared_from_this(), auth_meta);
262 auth_meta->auth_method = auth_method;
263 return auth_bl;
264 }
265 }
266 }
267
268 seastar::future<stop_t>
269 ProtocolV1::repeat_connect()
270 {
271 // encode ceph_msg_connect
272 memset(&h.connect, 0, sizeof(h.connect));
273 h.connect.features = conn.policy.features_supported;
274 h.connect.host_type = messenger.get_myname().type();
275 h.connect.global_seq = h.global_seq;
276 h.connect.connect_seq = h.connect_seq;
277 h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true);
278 // this is fyi, actually, server decides!
279 h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
280
281 ceph_assert(messenger.get_auth_client());
282
283 bufferlist bl;
284 bufferlist auth_bl = get_auth_payload();
285 if (auth_bl.length()) {
286 h.connect.authorizer_protocol = auth_meta->auth_method;
287 h.connect.authorizer_len = auth_bl.length();
288 bl.append(create_static(h.connect));
289 bl.claim_append(auth_bl);
290 } else {
291 h.connect.authorizer_protocol = 0;
292 h.connect.authorizer_len = 0;
293 bl.append(create_static(h.connect));
294 };
295 return socket->write_flush(std::move(bl))
296 .then([this] {
297 // read the reply
298 return socket->read(sizeof(h.reply));
299 }).then([this] (bufferlist bl) {
300 auto p = bl.cbegin();
301 ::decode(h.reply, p);
302 ceph_assert(p.end());
303 return socket->read(h.reply.authorizer_len);
304 }).then([this] (bufferlist bl) {
305 h.auth_payload = std::move(bl);
306 return handle_connect_reply(h.reply.tag);
307 });
308 }
309
310 void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
311 const entity_name_t& _peer_name)
312 {
313 ceph_assert(state == state_t::none);
314 logger().trace("{} trigger connecting, was {}", conn, static_cast<int>(state));
315 state = state_t::connecting;
316 set_write_state(write_state_t::delay);
317
318 ceph_assert(!socket);
319 ceph_assert(!gate.is_closed());
320 conn.peer_addr = _peer_addr;
321 conn.target_addr = _peer_addr;
322 conn.set_peer_name(_peer_name);
323 conn.policy = messenger.get_policy(_peer_name.type());
324 messenger.register_conn(
325 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
326 gate.dispatch_in_background("start_connect", *this, [this] {
327 return Socket::connect(conn.peer_addr)
328 .then([this](SocketRef sock) {
329 socket = std::move(sock);
330 if (state != state_t::connecting) {
331 assert(state == state_t::closing);
332 return socket->close().then([] {
333 throw std::system_error(make_error_code(error::protocol_aborted));
334 });
335 }
336 return seastar::now();
337 }).then([this] {
338 return messenger.get_global_seq();
339 }).then([this] (auto gs) {
340 h.global_seq = gs;
341 // read server's handshake header
342 return socket->read(server_header_size);
343 }).then([this] (bufferlist headerbl) {
344 auto p = headerbl.cbegin();
345 validate_banner(p);
346 entity_addr_t saddr, caddr;
347 ::decode(saddr, p);
348 ::decode(caddr, p);
349 ceph_assert(p.end());
350 if (saddr != conn.peer_addr) {
351 logger().error("{} my peer_addr {} doesn't match what peer advertized {}",
352 conn, conn.peer_addr, saddr);
353 throw std::system_error(
354 make_error_code(crimson::net::error::bad_peer_address));
355 }
356 if (state != state_t::connecting) {
357 assert(state == state_t::closing);
358 throw std::system_error(make_error_code(error::protocol_aborted));
359 }
360 socket->learn_ephemeral_port_as_connector(caddr.get_port());
361 if (unlikely(caddr.is_msgr2())) {
362 logger().warn("{} peer sent a v2 address for me: {}",
363 conn, caddr);
364 throw std::system_error(
365 make_error_code(crimson::net::error::bad_peer_address));
366 }
367 caddr.set_type(entity_addr_t::TYPE_LEGACY);
368 return messenger.learned_addr(caddr, conn);
369 }).then([this] {
370 // encode/send client's handshake header
371 bufferlist bl;
372 bl.append(buffer::create_static(banner_size, banner));
373 ::encode(messenger.get_myaddr(), bl, 0);
374 return socket->write_flush(std::move(bl));
375 }).then([=] {
376 return seastar::repeat([this] {
377 return repeat_connect();
378 });
379 }).then([this] {
380 if (state != state_t::connecting) {
381 assert(state == state_t::closing);
382 throw std::system_error(make_error_code(error::protocol_aborted));
383 }
384 execute_open(open_t::connected);
385 }).handle_exception([this] (std::exception_ptr eptr) {
386 // TODO: handle fault in the connecting state
387 logger().warn("{} connecting fault: {}", conn, eptr);
388 close(true);
389 });
390 });
391 }
392
393 // accepting state
394
395 seastar::future<stop_t> ProtocolV1::send_connect_reply(
396 msgr_tag_t tag, bufferlist&& authorizer_reply)
397 {
398 h.reply.tag = tag;
399 h.reply.features = static_cast<uint64_t>((h.connect.features &
400 conn.policy.features_supported) |
401 conn.policy.features_required);
402 h.reply.authorizer_len = authorizer_reply.length();
403 return socket->write(make_static_packet(h.reply))
404 .then([this, reply=std::move(authorizer_reply)]() mutable {
405 return socket->write_flush(std::move(reply));
406 }).then([] {
407 return stop_t::no;
408 });
409 }
410
411 seastar::future<stop_t> ProtocolV1::send_connect_reply_ready(
412 msgr_tag_t tag, bufferlist&& authorizer_reply)
413 {
414 return messenger.get_global_seq(
415 ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) {
416 h.global_seq = gs;
417 h.reply.tag = tag;
418 h.reply.features = conn.policy.features_supported;
419 h.reply.global_seq = h.global_seq;
420 h.reply.connect_seq = h.connect_seq;
421 h.reply.flags = 0;
422 if (conn.policy.lossy) {
423 h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
424 }
425 h.reply.authorizer_len = auth_len;
426
427 session_security.reset(
428 get_auth_session_handler(nullptr,
429 auth_meta->auth_method,
430 auth_meta->session_key,
431 conn.features));
432
433 return socket->write(make_static_packet(h.reply));
434 }).then([this, reply=std::move(authorizer_reply)]() mutable {
435 if (reply.length()) {
436 return socket->write(std::move(reply));
437 } else {
438 return seastar::now();
439 }
440 }).then([this] {
441 if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
442 return socket->write_flush(make_static_packet(conn.in_seq))
443 .then([this] {
444 return socket->read_exactly(sizeof(seq_num_t));
445 }).then([this] (auto buf) {
446 auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
447 discard_up_to(&conn.out_q, *acked_seq);
448 });
449 } else {
450 return socket->flush();
451 }
452 }).then([] {
453 return stop_t::yes;
454 });
455 }
456
457 seastar::future<stop_t> ProtocolV1::replace_existing(
458 SocketConnectionRef existing,
459 bufferlist&& authorizer_reply,
460 bool is_reset_from_peer)
461 {
462 msgr_tag_t reply_tag;
463 if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
464 !is_reset_from_peer) {
465 reply_tag = CEPH_MSGR_TAG_SEQ;
466 } else {
467 reply_tag = CEPH_MSGR_TAG_READY;
468 }
469 if (!existing->is_lossy()) {
470 // XXX: we decided not to support lossless connection in v1. as the
471 // client's default policy is
472 // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
473 // lossy. And by the time
474 // will all be performed using v2 protocol.
475 ceph_abort("lossless policy not supported for v1");
476 }
477 existing->protocol->close(true);
478 return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
479 }
480
481 seastar::future<stop_t> ProtocolV1::handle_connect_with_existing(
482 SocketConnectionRef existing, bufferlist&& authorizer_reply)
483 {
484 ProtocolV1 *exproto = dynamic_cast<ProtocolV1*>(existing->protocol.get());
485
486 if (h.connect.global_seq < exproto->peer_global_seq()) {
487 h.reply.global_seq = exproto->peer_global_seq();
488 return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
489 } else if (existing->is_lossy()) {
490 return replace_existing(existing, std::move(authorizer_reply));
491 } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) {
492 return replace_existing(existing, std::move(authorizer_reply), true);
493 } else if (h.connect.connect_seq < exproto->connect_seq()) {
494 // old attempt, or we sent READY but they didn't get it.
495 h.reply.connect_seq = exproto->connect_seq() + 1;
496 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
497 } else if (h.connect.connect_seq == exproto->connect_seq()) {
498 // if the existing connection successfully opened, and/or
499 // subsequently went to standby, then the peer should bump
500 // their connect_seq and retry: this is not a connection race
501 // we need to resolve here.
502 if (exproto->get_state() == state_t::open ||
503 exproto->get_state() == state_t::standby) {
504 if (conn.policy.resetcheck && exproto->connect_seq() == 0) {
505 return replace_existing(existing, std::move(authorizer_reply));
506 } else {
507 h.reply.connect_seq = exproto->connect_seq() + 1;
508 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
509 }
510 } else if (existing->peer_wins()) {
511 return replace_existing(existing, std::move(authorizer_reply));
512 } else {
513 return send_connect_reply(CEPH_MSGR_TAG_WAIT);
514 }
515 } else if (conn.policy.resetcheck &&
516 exproto->connect_seq() == 0) {
517 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
518 } else {
519 return replace_existing(existing, std::move(authorizer_reply));
520 }
521 }
522
523 bool ProtocolV1::require_auth_feature() const
524 {
525 if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
526 return false;
527 }
528 if (local_conf()->cephx_require_signatures) {
529 return true;
530 }
531 if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
532 h.connect.host_type == CEPH_ENTITY_TYPE_MDS ||
533 h.connect.host_type == CEPH_ENTITY_TYPE_MGR) {
534 return local_conf()->cephx_cluster_require_signatures;
535 } else {
536 return local_conf()->cephx_service_require_signatures;
537 }
538 }
539
540 bool ProtocolV1::require_cephx_v2_feature() const
541 {
542 if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
543 return false;
544 }
545 if (local_conf()->cephx_require_version >= 2) {
546 return true;
547 }
548 if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
549 h.connect.host_type == CEPH_ENTITY_TYPE_MDS ||
550 h.connect.host_type == CEPH_ENTITY_TYPE_MGR) {
551 return local_conf()->cephx_cluster_require_version >= 2;
552 } else {
553 return local_conf()->cephx_service_require_version >= 2;
554 }
555 }
556
557 seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
558 {
559 return socket->read(sizeof(h.connect))
560 .then([this](bufferlist bl) {
561 auto p = bl.cbegin();
562 ::decode(h.connect, p);
563 if (conn.get_peer_type() != 0 &&
564 conn.get_peer_type() != h.connect.host_type) {
565 logger().error("{} repeat_handle_connect(): my peer type does not match"
566 " what peer advertises {} != {}",
567 conn, conn.get_peer_type(), h.connect.host_type);
568 throw std::system_error(make_error_code(error::protocol_aborted));
569 }
570 conn.set_peer_type(h.connect.host_type);
571 conn.policy = messenger.get_policy(h.connect.host_type);
572 if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
573 logger().error("{} we don't know how to reconnect to peer {}",
574 conn, conn.target_addr);
575 throw std::system_error(
576 make_error_code(crimson::net::error::bad_peer_address));
577 }
578 return socket->read(h.connect.authorizer_len);
579 }).then([this] (bufferlist authorizer) {
580 memset(&h.reply, 0, sizeof(h.reply));
581 // TODO: set reply.protocol_version
582 if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
583 return send_connect_reply(
584 CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
585 }
586 if (require_auth_feature()) {
587 conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH;
588 }
589 if (require_cephx_v2_feature()) {
590 conn.policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
591 }
592 if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features;
593 feat_missing != 0) {
594 return send_connect_reply(
595 CEPH_MSGR_TAG_FEATURES, bufferlist{});
596 }
597
598 bufferlist authorizer_reply;
599 auth_meta->auth_method = h.connect.authorizer_protocol;
600 if (!HAVE_FEATURE((uint64_t)h.connect.features, CEPHX_V2)) {
601 // peer doesn't support it and we won't get here if we require it
602 auth_meta->skip_authorizer_challenge = true;
603 }
604 auto more = static_cast<bool>(auth_meta->authorizer_challenge);
605 ceph_assert(messenger.get_auth_server());
606 int r = messenger.get_auth_server()->handle_auth_request(
607 conn.shared_from_this(), auth_meta, more, auth_meta->auth_method, authorizer,
608 &authorizer_reply);
609
610 if (r < 0) {
611 session_security.reset();
612 return send_connect_reply(
613 CEPH_MSGR_TAG_BADAUTHORIZER, std::move(authorizer_reply));
614 } else if (r == 0) {
615 ceph_assert(authorizer_reply.length());
616 return send_connect_reply(
617 CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, std::move(authorizer_reply));
618 }
619
620 // r > 0
621 if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) {
622 if (existing->protocol->proto_type != proto_t::v1) {
623 logger().warn("{} existing {} proto version is {} not 1, close existing",
624 conn, *existing,
625 static_cast<int>(existing->protocol->proto_type));
626 // NOTE: this is following async messenger logic, but we may miss the reset event.
627 existing->mark_down();
628 } else {
629 return handle_connect_with_existing(existing, std::move(authorizer_reply));
630 }
631 }
632 if (h.connect.connect_seq > 0) {
633 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
634 std::move(authorizer_reply));
635 }
636 h.connect_seq = h.connect.connect_seq + 1;
637 h.peer_global_seq = h.connect.global_seq;
638 conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features);
639 // TODO: cct
640 return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
641 });
642 }
643
644 void ProtocolV1::start_accept(SocketRef&& sock,
645 const entity_addr_t& _peer_addr)
646 {
647 ceph_assert(state == state_t::none);
648 logger().trace("{} trigger accepting, was {}",
649 conn, static_cast<int>(state));
650 state = state_t::accepting;
651 set_write_state(write_state_t::delay);
652
653 ceph_assert(!socket);
654 // until we know better
655 conn.target_addr = _peer_addr;
656 socket = std::move(sock);
657 messenger.accept_conn(
658 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
659 gate.dispatch_in_background("start_accept", *this, [this] {
660 // stop learning my_addr before sending it out, so it won't change
661 return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
662 // encode/send server's handshake header
663 bufferlist bl;
664 bl.append(buffer::create_static(banner_size, banner));
665 ::encode(messenger.get_myaddr(), bl, 0);
666 ::encode(conn.target_addr, bl, 0);
667 return socket->write_flush(std::move(bl));
668 }).then([this] {
669 // read client's handshake header and connect request
670 return socket->read(client_header_size);
671 }).then([this] (bufferlist bl) {
672 auto p = bl.cbegin();
673 validate_banner(p);
674 entity_addr_t addr;
675 ::decode(addr, p);
676 ceph_assert(p.end());
677 if ((addr.is_legacy() || addr.is_any()) &&
678 addr.is_same_host(conn.target_addr)) {
679 // good
680 } else {
681 logger().error("{} peer advertized an invalid peer_addr: {},"
682 " which should be v1 and the same host with {}.",
683 conn, addr, conn.peer_addr);
684 throw std::system_error(
685 make_error_code(crimson::net::error::bad_peer_address));
686 }
687 conn.peer_addr = addr;
688 conn.target_addr = conn.peer_addr;
689 return seastar::repeat([this] {
690 return repeat_handle_connect();
691 });
692 }).then([this] {
693 if (state != state_t::accepting) {
694 assert(state == state_t::closing);
695 throw std::system_error(make_error_code(error::protocol_aborted));
696 }
697 messenger.register_conn(
698 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
699 messenger.unaccept_conn(
700 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
701 execute_open(open_t::accepted);
702 }).handle_exception([this] (std::exception_ptr eptr) {
703 // TODO: handle fault in the accepting state
704 logger().warn("{} accepting fault: {}", conn, eptr);
705 close(false);
706 });
707 });
708 }
709
710 // open state
711
712 ceph::bufferlist ProtocolV1::do_sweep_messages(
713 const std::deque<MessageRef>& msgs,
714 size_t num_msgs,
715 bool require_keepalive,
716 std::optional<utime_t> _keepalive_ack,
717 bool require_ack)
718 {
719 static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
720 sizeof(ceph_msg_header) +
721 sizeof(ceph_msg_footer);
722 static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) +
723 sizeof(ceph_msg_header) +
724 sizeof(ceph_msg_footer_old);
725
726 ceph::bufferlist bl;
727 if (likely(num_msgs)) {
728 if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
729 bl.reserve(num_msgs * RESERVE_MSG_SIZE);
730 } else {
731 bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD);
732 }
733 }
734
735 if (unlikely(require_keepalive)) {
736 k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
737 ceph::coarse_real_clock::now());
738 logger().trace("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
739 bl.append(create_static(k.req));
740 }
741
742 if (unlikely(_keepalive_ack.has_value())) {
743 logger().trace("{} write keepalive2 ack {}", conn, *_keepalive_ack);
744 k.ack.stamp = ceph_timespec(*_keepalive_ack);
745 bl.append(create_static(k.ack));
746 }
747
748 if (require_ack) {
749 // XXX: we decided not to support lossless connection in v1. as the
750 // client's default policy is
751 // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
752 // lossy. And by the time of crimson-osd's GA, the in-cluster communication
753 // will all be performed using v2 protocol.
754 ceph_abort("lossless policy not supported for v1");
755 }
756
757 std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
758 ceph_assert(!msg->get_seq() && "message already has seq");
759 msg->set_seq(++conn.out_seq);
760 auto& header = msg->get_header();
761 header.src = messenger.get_myname();
762 msg->encode(conn.features, messenger.get_crc_flags());
763 if (session_security) {
764 session_security->sign_message(msg.get());
765 }
766 logger().debug("{} --> #{} === {} ({})",
767 conn, msg->get_seq(), *msg, msg->get_type());
768 bl.append(CEPH_MSGR_TAG_MSG);
769 bl.append((const char*)&header, sizeof(header));
770 bl.append(msg->get_payload());
771 bl.append(msg->get_middle());
772 bl.append(msg->get_data());
773 auto& footer = msg->get_footer();
774 if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
775 bl.append((const char*)&footer, sizeof(footer));
776 } else {
777 ceph_msg_footer_old old_footer;
778 if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
779 old_footer.front_crc = footer.front_crc;
780 old_footer.middle_crc = footer.middle_crc;
781 } else {
782 old_footer.front_crc = old_footer.middle_crc = 0;
783 }
784 if (messenger.get_crc_flags() & MSG_CRC_DATA) {
785 old_footer.data_crc = footer.data_crc;
786 } else {
787 old_footer.data_crc = 0;
788 }
789 old_footer.flags = footer.flags;
790 bl.append((const char*)&old_footer, sizeof(old_footer));
791 }
792 });
793
794 return bl;
795 }
796
797 seastar::future<> ProtocolV1::handle_keepalive2_ack()
798 {
799 return socket->read_exactly(sizeof(ceph_timespec))
800 .then([this] (auto buf) {
801 auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
802 k.ack_stamp = *t;
803 logger().trace("{} got keepalive2 ack {}", conn, t->tv_sec);
804 });
805 }
806
807 seastar::future<> ProtocolV1::handle_keepalive2()
808 {
809 return socket->read_exactly(sizeof(ceph_timespec))
810 .then([this] (auto buf) {
811 utime_t ack{*reinterpret_cast<const ceph_timespec*>(buf.get())};
812 notify_keepalive_ack(ack);
813 });
814 }
815
816 seastar::future<> ProtocolV1::handle_ack()
817 {
818 return socket->read_exactly(sizeof(ceph_le64))
819 .then([this] (auto buf) {
820 auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
821 discard_up_to(&conn.sent, *seq);
822 });
823 }
824
825 seastar::future<> ProtocolV1::maybe_throttle()
826 {
827 if (!conn.policy.throttler_bytes) {
828 return seastar::now();
829 }
830 const auto to_read = (m.header.front_len +
831 m.header.middle_len +
832 m.header.data_len);
833 return conn.policy.throttler_bytes->get(to_read);
834 }
835
836 seastar::future<> ProtocolV1::read_message()
837 {
838 return socket->read(sizeof(m.header))
839 .then([this] (bufferlist bl) {
840 // throttle the traffic, maybe
841 auto p = bl.cbegin();
842 ::decode(m.header, p);
843 return maybe_throttle();
844 }).then([this] {
845 // read front
846 return socket->read(m.header.front_len);
847 }).then([this] (bufferlist bl) {
848 m.front = std::move(bl);
849 // read middle
850 return socket->read(m.header.middle_len);
851 }).then([this] (bufferlist bl) {
852 m.middle = std::move(bl);
853 // read data
854 return socket->read(m.header.data_len);
855 }).then([this] (bufferlist bl) {
856 m.data = std::move(bl);
857 // read footer
858 return socket->read(sizeof(m.footer));
859 }).then([this] (bufferlist bl) {
860 auto p = bl.cbegin();
861 ::decode(m.footer, p);
862 auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
863 conn.shared_from_this());
864 auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
865 m.front, m.middle, m.data, conn_ref);
866 if (unlikely(!msg)) {
867 logger().warn("{} decode message failed", conn);
868 throw std::system_error{make_error_code(error::corrupted_message)};
869 }
870 constexpr bool add_ref = false; // Message starts with 1 ref
871 // TODO: change MessageRef with foreign_ptr
872 auto msg_ref = MessageRef{msg, add_ref};
873
874 if (session_security) {
875 if (unlikely(session_security->check_message_signature(msg))) {
876 logger().warn("{} message signature check failed", conn);
877 throw std::system_error{make_error_code(error::corrupted_message)};
878 }
879 }
880 // TODO: set time stamps
881 msg->set_byte_throttler(conn.policy.throttler_bytes);
882
883 if (unlikely(!conn.update_rx_seq(msg->get_seq()))) {
884 // skip this message
885 return seastar::now();
886 }
887
888 logger().debug("{} <== #{} === {} ({})",
889 conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type());
890 // throttle the reading process by the returned future
891 return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
892 });
893 }
894
895 seastar::future<> ProtocolV1::handle_tags()
896 {
897 return seastar::keep_doing([this] {
898 // read the next tag
899 return socket->read_exactly(1)
900 .then([this] (auto buf) {
901 switch (buf[0]) {
902 case CEPH_MSGR_TAG_MSG:
903 return read_message();
904 case CEPH_MSGR_TAG_ACK:
905 return handle_ack();
906 case CEPH_MSGR_TAG_KEEPALIVE:
907 return seastar::now();
908 case CEPH_MSGR_TAG_KEEPALIVE2:
909 return handle_keepalive2();
910 case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
911 return handle_keepalive2_ack();
912 case CEPH_MSGR_TAG_CLOSE:
913 logger().info("{} got tag close", conn);
914 throw std::system_error(make_error_code(error::protocol_aborted));
915 default:
916 logger().error("{} got unknown msgr tag {}",
917 conn, static_cast<int>(buf[0]));
918 throw std::system_error(make_error_code(error::read_eof));
919 }
920 });
921 });
922 }
923
924 void ProtocolV1::execute_open(open_t type)
925 {
926 logger().trace("{} trigger open, was {}", conn, static_cast<int>(state));
927 state = state_t::open;
928 set_write_state(write_state_t::open);
929
930 if (type == open_t::connected) {
931 dispatchers.ms_handle_connect(
932 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
933 } else { // type == open_t::accepted
934 dispatchers.ms_handle_accept(
935 seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
936 }
937
938 gate.dispatch_in_background("execute_open", *this, [this] {
939 // start background processing of tags
940 return handle_tags()
941 .handle_exception_type([this] (const std::system_error& e) {
942 logger().warn("{} open fault: {}", conn, e);
943 if (e.code() == error::protocol_aborted ||
944 e.code() == std::errc::connection_reset ||
945 e.code() == error::read_eof) {
946 close(true);
947 return seastar::now();
948 } else {
949 throw e;
950 }
951 }).handle_exception([this] (std::exception_ptr eptr) {
952 // TODO: handle fault in the open state
953 logger().warn("{} open fault: {}", conn, eptr);
954 close(true);
955 });
956 });
957 }
958
959 // closing state
960
961 void ProtocolV1::trigger_close()
962 {
963 logger().trace("{} trigger closing, was {}",
964 conn, static_cast<int>(state));
965 messenger.closing_conn(
966 seastar::static_pointer_cast<SocketConnection>(
967 conn.shared_from_this()));
968
969 if (state == state_t::accepting) {
970 messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(
971 conn.shared_from_this()));
972 } else if (state >= state_t::connecting && state < state_t::closing) {
973 messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
974 conn.shared_from_this()));
975 } else {
976 // cannot happen
977 ceph_assert(false);
978 }
979
980 if (!socket) {
981 ceph_assert(state == state_t::connecting);
982 }
983
984 state = state_t::closing;
985 }
986
987 void ProtocolV1::on_closed()
988 {
989 messenger.closed_conn(
990 seastar::static_pointer_cast<SocketConnection>(
991 conn.shared_from_this()));
992 }
993
994 seastar::future<> ProtocolV1::fault()
995 {
996 if (conn.policy.lossy) {
997 messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(
998 conn.shared_from_this()));
999 }
1000 // XXX: we decided not to support lossless connection in v1. as the
1001 // client's default policy is
1002 // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
1003 // lossy. And by the time of crimson-osd's GA, the in-cluster communication
1004 // will all be performed using v2 protocol.
1005 ceph_abort("lossless policy not supported for v1");
1006 return seastar::now();
1007 }
1008
1009 void ProtocolV1::print(std::ostream& out) const
1010 {
1011 out << conn;
1012 }
1013
1014 } // namespace crimson::net