1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV1.h"
6 #include <seastar/core/shared_future.hh>
7 #include <seastar/core/sleep.hh>
8 #include <seastar/net/packet.hh>
10 #include "include/msgr.h"
11 #include "include/random.h"
12 #include "auth/Auth.h"
13 #include "auth/AuthSessionHandler.h"
15 #include "crimson/auth/AuthClient.h"
16 #include "crimson/auth/AuthServer.h"
17 #include "crimson/common/log.h"
18 #include "chained_dispatchers.h"
21 #include "SocketConnection.h"
22 #include "SocketMessenger.h"
24 WRITE_RAW_ENCODER(ceph_msg_connect
);
25 WRITE_RAW_ENCODER(ceph_msg_connect_reply
);
27 using crimson::common::local_conf
;
29 std::ostream
& operator<<(std::ostream
& out
, const ceph_msg_connect
& c
)
31 return out
<< "connect{features=" << std::hex
<< c
.features
<< std::dec
32 << " host_type=" << c
.host_type
33 << " global_seq=" << c
.global_seq
34 << " connect_seq=" << c
.connect_seq
35 << " protocol_version=" << c
.protocol_version
36 << " authorizer_protocol=" << c
.authorizer_protocol
37 << " authorizer_len=" << c
.authorizer_len
38 << " flags=" << std::hex
<< static_cast<uint16_t>(c
.flags
) << std::dec
<< '}';
41 std::ostream
& operator<<(std::ostream
& out
, const ceph_msg_connect_reply
& r
)
43 return out
<< "connect_reply{tag=" << static_cast<uint16_t>(r
.tag
)
44 << " features=" << std::hex
<< r
.features
<< std::dec
45 << " global_seq=" << r
.global_seq
46 << " connect_seq=" << r
.connect_seq
47 << " protocol_version=" << r
.protocol_version
48 << " authorizer_len=" << r
.authorizer_len
49 << " flags=" << std::hex
<< static_cast<uint16_t>(r
.flags
) << std::dec
<< '}';
54 seastar::logger
& logger() {
55 return crimson::get_logger(ceph_subsys_ms
);
59 seastar::net::packet
make_static_packet(const T
& value
) {
60 return { reinterpret_cast<const char*>(&value
), sizeof(value
) };
63 // store the banner in a non-const string for buffer::create_static()
64 char banner
[] = CEPH_BANNER
;
65 constexpr size_t banner_size
= sizeof(CEPH_BANNER
)-1;
67 constexpr size_t client_header_size
= banner_size
+ sizeof(ceph_entity_addr
);
68 constexpr size_t server_header_size
= banner_size
+ 2 * sizeof(ceph_entity_addr
);
70 // check that the buffer starts with a valid banner without requiring it to
71 // be contiguous in memory
72 void validate_banner(bufferlist::const_iterator
& p
)
74 auto b
= std::cbegin(banner
);
75 auto end
= b
+ banner_size
;
77 const char *buf
{nullptr};
78 auto remaining
= std::distance(b
, end
);
79 auto len
= p
.get_ptr_and_advance(remaining
, &buf
);
80 if (!std::equal(buf
, buf
+ len
, b
)) {
81 throw std::system_error(
82 make_error_code(crimson::net::error::bad_connect_banner
));
88 // return a static bufferptr to the given object
90 bufferptr
create_static(T
& obj
)
92 return buffer::create_static(sizeof(obj
), reinterpret_cast<char*>(&obj
));
95 uint32_t get_proto_version(entity_type_t peer_type
, bool connect
)
97 constexpr entity_type_t my_type
= CEPH_ENTITY_TYPE_OSD
;
98 // see also OSD.h, unlike other connection of simple/async messenger,
99 // crimson msgr is only used by osd
100 constexpr uint32_t CEPH_OSD_PROTOCOL
= 10;
101 if (peer_type
== my_type
) {
103 return CEPH_OSD_PROTOCOL
;
106 switch (connect
? peer_type
: my_type
) {
107 case CEPH_ENTITY_TYPE_OSD
: return CEPH_OSDC_PROTOCOL
;
108 case CEPH_ENTITY_TYPE_MDS
: return CEPH_MDSC_PROTOCOL
;
109 case CEPH_ENTITY_TYPE_MON
: return CEPH_MONC_PROTOCOL
;
115 void discard_up_to(std::deque
<MessageRef
>* queue
,
116 crimson::net::seq_num_t seq
)
118 while (!queue
->empty() &&
119 queue
->front()->get_seq() < seq
) {
124 } // namespace anonymous
126 namespace crimson::net
{
128 ProtocolV1::ProtocolV1(ChainedDispatchers
& dispatchers
,
129 SocketConnection
& conn
,
130 SocketMessenger
& messenger
)
131 : Protocol(proto_t::v1
, dispatchers
, conn
), messenger
{messenger
} {}
133 ProtocolV1::~ProtocolV1() {}
135 bool ProtocolV1::is_connected() const
137 return state
== state_t::open
;
142 void ProtocolV1::reset_session()
148 if (HAVE_FEATURE(conn
.features
, MSG_AUTH
)) {
149 // Set out_seq to a random value, so CRC won't be predictable.
150 // Constant to limit starting sequence number to 2^31. Nothing special
151 // about it, just a big number.
152 constexpr uint64_t SEQ_MASK
= 0x7fffffff;
153 conn
.out_seq
= ceph::util::generate_random_number
<uint64_t>(0, SEQ_MASK
);
155 // previously, seq #'s always started at 0.
160 seastar::future
<stop_t
>
161 ProtocolV1::handle_connect_reply(msgr_tag_t tag
)
163 if (h
.auth_payload
.length() && !conn
.peer_is_mon()) {
164 if (tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
) { // more
165 h
.auth_more
= messenger
.get_auth_client()->handle_auth_reply_more(
166 conn
.shared_from_this(), auth_meta
, h
.auth_payload
);
167 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
169 int ret
= messenger
.get_auth_client()->handle_auth_done(
170 conn
.shared_from_this(), auth_meta
, 0, 0, h
.auth_payload
);
173 logger().warn("{} AuthClient::handle_auth_done() return {}", conn
, ret
);
174 throw std::system_error(make_error_code(error::negotiation_failure
));
180 case CEPH_MSGR_TAG_FEATURES
:
181 logger().error("{} connect protocol feature mispatch", __func__
);
182 throw std::system_error(make_error_code(error::negotiation_failure
));
183 case CEPH_MSGR_TAG_BADPROTOVER
:
184 logger().error("{} connect protocol version mispatch", __func__
);
185 throw std::system_error(make_error_code(error::negotiation_failure
));
186 case CEPH_MSGR_TAG_BADAUTHORIZER
:
187 logger().error("{} got bad authorizer", __func__
);
188 throw std::system_error(make_error_code(error::negotiation_failure
));
189 case CEPH_MSGR_TAG_RESETSESSION
:
191 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
192 case CEPH_MSGR_TAG_RETRY_GLOBAL
:
193 return messenger
.get_global_seq(h
.reply
.global_seq
).then([this] (auto gs
) {
195 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
197 case CEPH_MSGR_TAG_RETRY_SESSION
:
198 ceph_assert(h
.reply
.connect_seq
> h
.connect_seq
);
199 h
.connect_seq
= h
.reply
.connect_seq
;
200 return seastar::make_ready_future
<stop_t
>(stop_t::no
);
201 case CEPH_MSGR_TAG_WAIT
:
203 throw std::system_error(make_error_code(error::negotiation_failure
));
204 case CEPH_MSGR_TAG_SEQ
:
205 case CEPH_MSGR_TAG_READY
:
206 if (auto missing
= (conn
.policy
.features_required
& ~(uint64_t)h
.reply
.features
);
208 logger().error("{} missing required features", __func__
);
209 throw std::system_error(make_error_code(error::negotiation_failure
));
211 return seastar::futurize_invoke([this, tag
] {
212 if (tag
== CEPH_MSGR_TAG_SEQ
) {
213 return socket
->read_exactly(sizeof(seq_num_t
))
214 .then([this] (auto buf
) {
215 auto acked_seq
= reinterpret_cast<const seq_num_t
*>(buf
.get());
216 discard_up_to(&conn
.out_q
, *acked_seq
);
217 return socket
->write_flush(make_static_packet(conn
.in_seq
));
220 // tag CEPH_MSGR_TAG_READY
221 return seastar::now();
224 h
.peer_global_seq
= h
.reply
.global_seq
;
225 conn
.policy
.lossy
= h
.reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
228 conn
.set_features(h
.reply
.features
& h
.connect
.features
);
229 if (auth_meta
->authorizer
) {
230 session_security
.reset(
231 get_auth_session_handler(nullptr,
232 auth_meta
->authorizer
->protocol
,
233 auth_meta
->session_key
,
236 session_security
.reset();
238 return seastar::make_ready_future
<stop_t
>(stop_t::yes
);
243 logger().error("{} got unknown tag", __func__
, int(tag
));
244 throw std::system_error(make_error_code(error::negotiation_failure
));
248 ceph::bufferlist
ProtocolV1::get_auth_payload()
250 // only non-mons connectings to mons use MAuth messages
251 if (conn
.peer_is_mon() &&
252 messenger
.get_mytype() != CEPH_ENTITY_TYPE_MON
) {
255 if (h
.auth_more
.length()) {
256 logger().info("using augmented (challenge) auth payload");
257 return std::move(h
.auth_more
);
259 auto [auth_method
, preferred_modes
, auth_bl
] =
260 messenger
.get_auth_client()->get_auth_request(
261 conn
.shared_from_this(), auth_meta
);
262 auth_meta
->auth_method
= auth_method
;
268 seastar::future
<stop_t
>
269 ProtocolV1::repeat_connect()
271 // encode ceph_msg_connect
272 memset(&h
.connect
, 0, sizeof(h
.connect
));
273 h
.connect
.features
= conn
.policy
.features_supported
;
274 h
.connect
.host_type
= messenger
.get_myname().type();
275 h
.connect
.global_seq
= h
.global_seq
;
276 h
.connect
.connect_seq
= h
.connect_seq
;
277 h
.connect
.protocol_version
= get_proto_version(conn
.get_peer_type(), true);
278 // this is fyi, actually, server decides!
279 h
.connect
.flags
= conn
.policy
.lossy
? CEPH_MSG_CONNECT_LOSSY
: 0;
281 ceph_assert(messenger
.get_auth_client());
284 bufferlist auth_bl
= get_auth_payload();
285 if (auth_bl
.length()) {
286 h
.connect
.authorizer_protocol
= auth_meta
->auth_method
;
287 h
.connect
.authorizer_len
= auth_bl
.length();
288 bl
.append(create_static(h
.connect
));
289 bl
.claim_append(auth_bl
);
291 h
.connect
.authorizer_protocol
= 0;
292 h
.connect
.authorizer_len
= 0;
293 bl
.append(create_static(h
.connect
));
295 return socket
->write_flush(std::move(bl
))
298 return socket
->read(sizeof(h
.reply
));
299 }).then([this] (bufferlist bl
) {
300 auto p
= bl
.cbegin();
301 ::decode(h
.reply
, p
);
302 ceph_assert(p
.end());
303 return socket
->read(h
.reply
.authorizer_len
);
304 }).then([this] (bufferlist bl
) {
305 h
.auth_payload
= std::move(bl
);
306 return handle_connect_reply(h
.reply
.tag
);
310 void ProtocolV1::start_connect(const entity_addr_t
& _peer_addr
,
311 const entity_name_t
& _peer_name
)
313 ceph_assert(state
== state_t::none
);
314 logger().trace("{} trigger connecting, was {}", conn
, static_cast<int>(state
));
315 state
= state_t::connecting
;
316 set_write_state(write_state_t::delay
);
318 ceph_assert(!socket
);
319 ceph_assert(!gate
.is_closed());
320 conn
.peer_addr
= _peer_addr
;
321 conn
.target_addr
= _peer_addr
;
322 conn
.set_peer_name(_peer_name
);
323 conn
.policy
= messenger
.get_policy(_peer_name
.type());
324 messenger
.register_conn(
325 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
326 gate
.dispatch_in_background("start_connect", *this, [this] {
327 return Socket::connect(conn
.peer_addr
)
328 .then([this](SocketRef sock
) {
329 socket
= std::move(sock
);
330 if (state
!= state_t::connecting
) {
331 assert(state
== state_t::closing
);
332 return socket
->close().then([] {
333 throw std::system_error(make_error_code(error::protocol_aborted
));
336 return seastar::now();
338 return messenger
.get_global_seq();
339 }).then([this] (auto gs
) {
341 // read server's handshake header
342 return socket
->read(server_header_size
);
343 }).then([this] (bufferlist headerbl
) {
344 auto p
= headerbl
.cbegin();
346 entity_addr_t saddr
, caddr
;
349 ceph_assert(p
.end());
350 if (saddr
!= conn
.peer_addr
) {
351 logger().error("{} my peer_addr {} doesn't match what peer advertized {}",
352 conn
, conn
.peer_addr
, saddr
);
353 throw std::system_error(
354 make_error_code(crimson::net::error::bad_peer_address
));
356 if (state
!= state_t::connecting
) {
357 assert(state
== state_t::closing
);
358 throw std::system_error(make_error_code(error::protocol_aborted
));
360 socket
->learn_ephemeral_port_as_connector(caddr
.get_port());
361 if (unlikely(caddr
.is_msgr2())) {
362 logger().warn("{} peer sent a v2 address for me: {}",
364 throw std::system_error(
365 make_error_code(crimson::net::error::bad_peer_address
));
367 caddr
.set_type(entity_addr_t::TYPE_LEGACY
);
368 return messenger
.learned_addr(caddr
, conn
);
370 // encode/send client's handshake header
372 bl
.append(buffer::create_static(banner_size
, banner
));
373 ::encode(messenger
.get_myaddr(), bl
, 0);
374 return socket
->write_flush(std::move(bl
));
376 return seastar::repeat([this] {
377 return repeat_connect();
380 if (state
!= state_t::connecting
) {
381 assert(state
== state_t::closing
);
382 throw std::system_error(make_error_code(error::protocol_aborted
));
384 execute_open(open_t::connected
);
385 }).handle_exception([this] (std::exception_ptr eptr
) {
386 // TODO: handle fault in the connecting state
387 logger().warn("{} connecting fault: {}", conn
, eptr
);
395 seastar::future
<stop_t
> ProtocolV1::send_connect_reply(
396 msgr_tag_t tag
, bufferlist
&& authorizer_reply
)
399 h
.reply
.features
= static_cast<uint64_t>((h
.connect
.features
&
400 conn
.policy
.features_supported
) |
401 conn
.policy
.features_required
);
402 h
.reply
.authorizer_len
= authorizer_reply
.length();
403 return socket
->write(make_static_packet(h
.reply
))
404 .then([this, reply
=std::move(authorizer_reply
)]() mutable {
405 return socket
->write_flush(std::move(reply
));
411 seastar::future
<stop_t
> ProtocolV1::send_connect_reply_ready(
412 msgr_tag_t tag
, bufferlist
&& authorizer_reply
)
414 return messenger
.get_global_seq(
415 ).then([this, tag
, auth_len
= authorizer_reply
.length()] (auto gs
) {
418 h
.reply
.features
= conn
.policy
.features_supported
;
419 h
.reply
.global_seq
= h
.global_seq
;
420 h
.reply
.connect_seq
= h
.connect_seq
;
422 if (conn
.policy
.lossy
) {
423 h
.reply
.flags
= h
.reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
425 h
.reply
.authorizer_len
= auth_len
;
427 session_security
.reset(
428 get_auth_session_handler(nullptr,
429 auth_meta
->auth_method
,
430 auth_meta
->session_key
,
433 return socket
->write(make_static_packet(h
.reply
));
434 }).then([this, reply
=std::move(authorizer_reply
)]() mutable {
435 if (reply
.length()) {
436 return socket
->write(std::move(reply
));
438 return seastar::now();
441 if (h
.reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
442 return socket
->write_flush(make_static_packet(conn
.in_seq
))
444 return socket
->read_exactly(sizeof(seq_num_t
));
445 }).then([this] (auto buf
) {
446 auto acked_seq
= reinterpret_cast<const seq_num_t
*>(buf
.get());
447 discard_up_to(&conn
.out_q
, *acked_seq
);
450 return socket
->flush();
457 seastar::future
<stop_t
> ProtocolV1::replace_existing(
458 SocketConnectionRef existing
,
459 bufferlist
&& authorizer_reply
,
460 bool is_reset_from_peer
)
462 msgr_tag_t reply_tag
;
463 if (HAVE_FEATURE(h
.connect
.features
, RECONNECT_SEQ
) &&
464 !is_reset_from_peer
) {
465 reply_tag
= CEPH_MSGR_TAG_SEQ
;
467 reply_tag
= CEPH_MSGR_TAG_READY
;
469 if (!existing
->is_lossy()) {
470 // XXX: we decided not to support lossless connection in v1. as the
471 // client's default policy is
472 // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
473 // lossy. And by the time
474 // will all be performed using v2 protocol.
475 ceph_abort("lossless policy not supported for v1");
477 existing
->protocol
->close(true);
478 return send_connect_reply_ready(reply_tag
, std::move(authorizer_reply
));
481 seastar::future
<stop_t
> ProtocolV1::handle_connect_with_existing(
482 SocketConnectionRef existing
, bufferlist
&& authorizer_reply
)
484 ProtocolV1
*exproto
= dynamic_cast<ProtocolV1
*>(existing
->protocol
.get());
486 if (h
.connect
.global_seq
< exproto
->peer_global_seq()) {
487 h
.reply
.global_seq
= exproto
->peer_global_seq();
488 return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL
);
489 } else if (existing
->is_lossy()) {
490 return replace_existing(existing
, std::move(authorizer_reply
));
491 } else if (h
.connect
.connect_seq
== 0 && exproto
->connect_seq() > 0) {
492 return replace_existing(existing
, std::move(authorizer_reply
), true);
493 } else if (h
.connect
.connect_seq
< exproto
->connect_seq()) {
494 // old attempt, or we sent READY but they didn't get it.
495 h
.reply
.connect_seq
= exproto
->connect_seq() + 1;
496 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION
);
497 } else if (h
.connect
.connect_seq
== exproto
->connect_seq()) {
498 // if the existing connection successfully opened, and/or
499 // subsequently went to standby, then the peer should bump
500 // their connect_seq and retry: this is not a connection race
501 // we need to resolve here.
502 if (exproto
->get_state() == state_t::open
||
503 exproto
->get_state() == state_t::standby
) {
504 if (conn
.policy
.resetcheck
&& exproto
->connect_seq() == 0) {
505 return replace_existing(existing
, std::move(authorizer_reply
));
507 h
.reply
.connect_seq
= exproto
->connect_seq() + 1;
508 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION
);
510 } else if (existing
->peer_wins()) {
511 return replace_existing(existing
, std::move(authorizer_reply
));
513 return send_connect_reply(CEPH_MSGR_TAG_WAIT
);
515 } else if (conn
.policy
.resetcheck
&&
516 exproto
->connect_seq() == 0) {
517 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION
);
519 return replace_existing(existing
, std::move(authorizer_reply
));
523 bool ProtocolV1::require_auth_feature() const
525 if (h
.connect
.authorizer_protocol
!= CEPH_AUTH_CEPHX
) {
528 if (local_conf()->cephx_require_signatures
) {
531 if (h
.connect
.host_type
== CEPH_ENTITY_TYPE_OSD
||
532 h
.connect
.host_type
== CEPH_ENTITY_TYPE_MDS
||
533 h
.connect
.host_type
== CEPH_ENTITY_TYPE_MGR
) {
534 return local_conf()->cephx_cluster_require_signatures
;
536 return local_conf()->cephx_service_require_signatures
;
540 bool ProtocolV1::require_cephx_v2_feature() const
542 if (h
.connect
.authorizer_protocol
!= CEPH_AUTH_CEPHX
) {
545 if (local_conf()->cephx_require_version
>= 2) {
548 if (h
.connect
.host_type
== CEPH_ENTITY_TYPE_OSD
||
549 h
.connect
.host_type
== CEPH_ENTITY_TYPE_MDS
||
550 h
.connect
.host_type
== CEPH_ENTITY_TYPE_MGR
) {
551 return local_conf()->cephx_cluster_require_version
>= 2;
553 return local_conf()->cephx_service_require_version
>= 2;
557 seastar::future
<stop_t
> ProtocolV1::repeat_handle_connect()
559 return socket
->read(sizeof(h
.connect
))
560 .then([this](bufferlist bl
) {
561 auto p
= bl
.cbegin();
562 ::decode(h
.connect
, p
);
563 if (conn
.get_peer_type() != 0 &&
564 conn
.get_peer_type() != h
.connect
.host_type
) {
565 logger().error("{} repeat_handle_connect(): my peer type does not match"
566 " what peer advertises {} != {}",
567 conn
, conn
.get_peer_type(), h
.connect
.host_type
);
568 throw std::system_error(make_error_code(error::protocol_aborted
));
570 conn
.set_peer_type(h
.connect
.host_type
);
571 conn
.policy
= messenger
.get_policy(h
.connect
.host_type
);
572 if (!conn
.policy
.lossy
&& !conn
.policy
.server
&& conn
.target_addr
.get_port() <= 0) {
573 logger().error("{} we don't know how to reconnect to peer {}",
574 conn
, conn
.target_addr
);
575 throw std::system_error(
576 make_error_code(crimson::net::error::bad_peer_address
));
578 return socket
->read(h
.connect
.authorizer_len
);
579 }).then([this] (bufferlist authorizer
) {
580 memset(&h
.reply
, 0, sizeof(h
.reply
));
581 // TODO: set reply.protocol_version
582 if (h
.connect
.protocol_version
!= get_proto_version(h
.connect
.host_type
, false)) {
583 return send_connect_reply(
584 CEPH_MSGR_TAG_BADPROTOVER
, bufferlist
{});
586 if (require_auth_feature()) {
587 conn
.policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
589 if (require_cephx_v2_feature()) {
590 conn
.policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
592 if (auto feat_missing
= conn
.policy
.features_required
& ~(uint64_t)h
.connect
.features
;
594 return send_connect_reply(
595 CEPH_MSGR_TAG_FEATURES
, bufferlist
{});
598 bufferlist authorizer_reply
;
599 auth_meta
->auth_method
= h
.connect
.authorizer_protocol
;
600 if (!HAVE_FEATURE((uint64_t)h
.connect
.features
, CEPHX_V2
)) {
601 // peer doesn't support it and we won't get here if we require it
602 auth_meta
->skip_authorizer_challenge
= true;
604 auto more
= static_cast<bool>(auth_meta
->authorizer_challenge
);
605 ceph_assert(messenger
.get_auth_server());
606 int r
= messenger
.get_auth_server()->handle_auth_request(
607 conn
.shared_from_this(), auth_meta
, more
, auth_meta
->auth_method
, authorizer
,
611 session_security
.reset();
612 return send_connect_reply(
613 CEPH_MSGR_TAG_BADAUTHORIZER
, std::move(authorizer_reply
));
615 ceph_assert(authorizer_reply
.length());
616 return send_connect_reply(
617 CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
, std::move(authorizer_reply
));
621 if (auto existing
= messenger
.lookup_conn(conn
.peer_addr
); existing
) {
622 if (existing
->protocol
->proto_type
!= proto_t::v1
) {
623 logger().warn("{} existing {} proto version is {} not 1, close existing",
625 static_cast<int>(existing
->protocol
->proto_type
));
626 // NOTE: this is following async messenger logic, but we may miss the reset event.
627 existing
->mark_down();
629 return handle_connect_with_existing(existing
, std::move(authorizer_reply
));
632 if (h
.connect
.connect_seq
> 0) {
633 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION
,
634 std::move(authorizer_reply
));
636 h
.connect_seq
= h
.connect
.connect_seq
+ 1;
637 h
.peer_global_seq
= h
.connect
.global_seq
;
638 conn
.set_features((uint64_t)conn
.policy
.features_supported
& (uint64_t)h
.connect
.features
);
640 return send_connect_reply_ready(CEPH_MSGR_TAG_READY
, std::move(authorizer_reply
));
644 void ProtocolV1::start_accept(SocketRef
&& sock
,
645 const entity_addr_t
& _peer_addr
)
647 ceph_assert(state
== state_t::none
);
648 logger().trace("{} trigger accepting, was {}",
649 conn
, static_cast<int>(state
));
650 state
= state_t::accepting
;
651 set_write_state(write_state_t::delay
);
653 ceph_assert(!socket
);
654 // until we know better
655 conn
.target_addr
= _peer_addr
;
656 socket
= std::move(sock
);
657 messenger
.accept_conn(
658 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
659 gate
.dispatch_in_background("start_accept", *this, [this] {
660 // stop learning my_addr before sending it out, so it won't change
661 return messenger
.learned_addr(messenger
.get_myaddr(), conn
).then([this] {
662 // encode/send server's handshake header
664 bl
.append(buffer::create_static(banner_size
, banner
));
665 ::encode(messenger
.get_myaddr(), bl
, 0);
666 ::encode(conn
.target_addr
, bl
, 0);
667 return socket
->write_flush(std::move(bl
));
669 // read client's handshake header and connect request
670 return socket
->read(client_header_size
);
671 }).then([this] (bufferlist bl
) {
672 auto p
= bl
.cbegin();
676 ceph_assert(p
.end());
677 if ((addr
.is_legacy() || addr
.is_any()) &&
678 addr
.is_same_host(conn
.target_addr
)) {
681 logger().error("{} peer advertized an invalid peer_addr: {},"
682 " which should be v1 and the same host with {}.",
683 conn
, addr
, conn
.peer_addr
);
684 throw std::system_error(
685 make_error_code(crimson::net::error::bad_peer_address
));
687 conn
.peer_addr
= addr
;
688 conn
.target_addr
= conn
.peer_addr
;
689 return seastar::repeat([this] {
690 return repeat_handle_connect();
693 if (state
!= state_t::accepting
) {
694 assert(state
== state_t::closing
);
695 throw std::system_error(make_error_code(error::protocol_aborted
));
697 messenger
.register_conn(
698 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
699 messenger
.unaccept_conn(
700 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
701 execute_open(open_t::accepted
);
702 }).handle_exception([this] (std::exception_ptr eptr
) {
703 // TODO: handle fault in the accepting state
704 logger().warn("{} accepting fault: {}", conn
, eptr
);
712 ceph::bufferlist
ProtocolV1::do_sweep_messages(
713 const std::deque
<MessageRef
>& msgs
,
715 bool require_keepalive
,
716 std::optional
<utime_t
> _keepalive_ack
,
719 static const size_t RESERVE_MSG_SIZE
= sizeof(CEPH_MSGR_TAG_MSG
) +
720 sizeof(ceph_msg_header
) +
721 sizeof(ceph_msg_footer
);
722 static const size_t RESERVE_MSG_SIZE_OLD
= sizeof(CEPH_MSGR_TAG_MSG
) +
723 sizeof(ceph_msg_header
) +
724 sizeof(ceph_msg_footer_old
);
727 if (likely(num_msgs
)) {
728 if (HAVE_FEATURE(conn
.features
, MSG_AUTH
)) {
729 bl
.reserve(num_msgs
* RESERVE_MSG_SIZE
);
731 bl
.reserve(num_msgs
* RESERVE_MSG_SIZE_OLD
);
735 if (unlikely(require_keepalive
)) {
736 k
.req
.stamp
= ceph::coarse_real_clock::to_ceph_timespec(
737 ceph::coarse_real_clock::now());
738 logger().trace("{} write keepalive2 {}", conn
, k
.req
.stamp
.tv_sec
);
739 bl
.append(create_static(k
.req
));
742 if (unlikely(_keepalive_ack
.has_value())) {
743 logger().trace("{} write keepalive2 ack {}", conn
, *_keepalive_ack
);
744 k
.ack
.stamp
= ceph_timespec(*_keepalive_ack
);
745 bl
.append(create_static(k
.ack
));
749 // XXX: we decided not to support lossless connection in v1. as the
750 // client's default policy is
751 // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
752 // lossy. And by the time of crimson-osd's GA, the in-cluster communication
753 // will all be performed using v2 protocol.
754 ceph_abort("lossless policy not supported for v1");
757 std::for_each(msgs
.begin(), msgs
.begin()+num_msgs
, [this, &bl
](const MessageRef
& msg
) {
758 ceph_assert(!msg
->get_seq() && "message already has seq");
759 msg
->set_seq(++conn
.out_seq
);
760 auto& header
= msg
->get_header();
761 header
.src
= messenger
.get_myname();
762 msg
->encode(conn
.features
, messenger
.get_crc_flags());
763 if (session_security
) {
764 session_security
->sign_message(msg
.get());
766 logger().debug("{} --> #{} === {} ({})",
767 conn
, msg
->get_seq(), *msg
, msg
->get_type());
768 bl
.append(CEPH_MSGR_TAG_MSG
);
769 bl
.append((const char*)&header
, sizeof(header
));
770 bl
.append(msg
->get_payload());
771 bl
.append(msg
->get_middle());
772 bl
.append(msg
->get_data());
773 auto& footer
= msg
->get_footer();
774 if (HAVE_FEATURE(conn
.features
, MSG_AUTH
)) {
775 bl
.append((const char*)&footer
, sizeof(footer
));
777 ceph_msg_footer_old old_footer
;
778 if (messenger
.get_crc_flags() & MSG_CRC_HEADER
) {
779 old_footer
.front_crc
= footer
.front_crc
;
780 old_footer
.middle_crc
= footer
.middle_crc
;
782 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
784 if (messenger
.get_crc_flags() & MSG_CRC_DATA
) {
785 old_footer
.data_crc
= footer
.data_crc
;
787 old_footer
.data_crc
= 0;
789 old_footer
.flags
= footer
.flags
;
790 bl
.append((const char*)&old_footer
, sizeof(old_footer
));
797 seastar::future
<> ProtocolV1::handle_keepalive2_ack()
799 return socket
->read_exactly(sizeof(ceph_timespec
))
800 .then([this] (auto buf
) {
801 auto t
= reinterpret_cast<const ceph_timespec
*>(buf
.get());
803 logger().trace("{} got keepalive2 ack {}", conn
, t
->tv_sec
);
807 seastar::future
<> ProtocolV1::handle_keepalive2()
809 return socket
->read_exactly(sizeof(ceph_timespec
))
810 .then([this] (auto buf
) {
811 utime_t ack
{*reinterpret_cast<const ceph_timespec
*>(buf
.get())};
812 notify_keepalive_ack(ack
);
816 seastar::future
<> ProtocolV1::handle_ack()
818 return socket
->read_exactly(sizeof(ceph_le64
))
819 .then([this] (auto buf
) {
820 auto seq
= reinterpret_cast<const ceph_le64
*>(buf
.get());
821 discard_up_to(&conn
.sent
, *seq
);
825 seastar::future
<> ProtocolV1::maybe_throttle()
827 if (!conn
.policy
.throttler_bytes
) {
828 return seastar::now();
830 const auto to_read
= (m
.header
.front_len
+
831 m
.header
.middle_len
+
833 return conn
.policy
.throttler_bytes
->get(to_read
);
836 seastar::future
<> ProtocolV1::read_message()
838 return socket
->read(sizeof(m
.header
))
839 .then([this] (bufferlist bl
) {
840 // throttle the traffic, maybe
841 auto p
= bl
.cbegin();
842 ::decode(m
.header
, p
);
843 return maybe_throttle();
846 return socket
->read(m
.header
.front_len
);
847 }).then([this] (bufferlist bl
) {
848 m
.front
= std::move(bl
);
850 return socket
->read(m
.header
.middle_len
);
851 }).then([this] (bufferlist bl
) {
852 m
.middle
= std::move(bl
);
854 return socket
->read(m
.header
.data_len
);
855 }).then([this] (bufferlist bl
) {
856 m
.data
= std::move(bl
);
858 return socket
->read(sizeof(m
.footer
));
859 }).then([this] (bufferlist bl
) {
860 auto p
= bl
.cbegin();
861 ::decode(m
.footer
, p
);
862 auto conn_ref
= seastar::static_pointer_cast
<SocketConnection
>(
863 conn
.shared_from_this());
864 auto msg
= ::decode_message(nullptr, 0, m
.header
, m
.footer
,
865 m
.front
, m
.middle
, m
.data
, conn_ref
);
866 if (unlikely(!msg
)) {
867 logger().warn("{} decode message failed", conn
);
868 throw std::system_error
{make_error_code(error::corrupted_message
)};
870 constexpr bool add_ref
= false; // Message starts with 1 ref
871 // TODO: change MessageRef with foreign_ptr
872 auto msg_ref
= MessageRef
{msg
, add_ref
};
874 if (session_security
) {
875 if (unlikely(session_security
->check_message_signature(msg
))) {
876 logger().warn("{} message signature check failed", conn
);
877 throw std::system_error
{make_error_code(error::corrupted_message
)};
880 // TODO: set time stamps
881 msg
->set_byte_throttler(conn
.policy
.throttler_bytes
);
883 if (unlikely(!conn
.update_rx_seq(msg
->get_seq()))) {
885 return seastar::now();
888 logger().debug("{} <== #{} === {} ({})",
889 conn
, msg_ref
->get_seq(), *msg_ref
, msg_ref
->get_type());
890 // throttle the reading process by the returned future
891 return dispatchers
.ms_dispatch(conn_ref
, std::move(msg_ref
));
895 seastar::future
<> ProtocolV1::handle_tags()
897 return seastar::keep_doing([this] {
899 return socket
->read_exactly(1)
900 .then([this] (auto buf
) {
902 case CEPH_MSGR_TAG_MSG
:
903 return read_message();
904 case CEPH_MSGR_TAG_ACK
:
906 case CEPH_MSGR_TAG_KEEPALIVE
:
907 return seastar::now();
908 case CEPH_MSGR_TAG_KEEPALIVE2
:
909 return handle_keepalive2();
910 case CEPH_MSGR_TAG_KEEPALIVE2_ACK
:
911 return handle_keepalive2_ack();
912 case CEPH_MSGR_TAG_CLOSE
:
913 logger().info("{} got tag close", conn
);
914 throw std::system_error(make_error_code(error::protocol_aborted
));
916 logger().error("{} got unknown msgr tag {}",
917 conn
, static_cast<int>(buf
[0]));
918 throw std::system_error(make_error_code(error::read_eof
));
924 void ProtocolV1::execute_open(open_t type
)
926 logger().trace("{} trigger open, was {}", conn
, static_cast<int>(state
));
927 state
= state_t::open
;
928 set_write_state(write_state_t::open
);
930 if (type
== open_t::connected
) {
931 dispatchers
.ms_handle_connect(
932 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
933 } else { // type == open_t::accepted
934 dispatchers
.ms_handle_accept(
935 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
938 gate
.dispatch_in_background("execute_open", *this, [this] {
939 // start background processing of tags
941 .handle_exception_type([this] (const std::system_error
& e
) {
942 logger().warn("{} open fault: {}", conn
, e
);
943 if (e
.code() == error::protocol_aborted
||
944 e
.code() == std::errc::connection_reset
||
945 e
.code() == error::read_eof
) {
947 return seastar::now();
951 }).handle_exception([this] (std::exception_ptr eptr
) {
952 // TODO: handle fault in the open state
953 logger().warn("{} open fault: {}", conn
, eptr
);
961 void ProtocolV1::trigger_close()
963 logger().trace("{} trigger closing, was {}",
964 conn
, static_cast<int>(state
));
965 messenger
.closing_conn(
966 seastar::static_pointer_cast
<SocketConnection
>(
967 conn
.shared_from_this()));
969 if (state
== state_t::accepting
) {
970 messenger
.unaccept_conn(seastar::static_pointer_cast
<SocketConnection
>(
971 conn
.shared_from_this()));
972 } else if (state
>= state_t::connecting
&& state
< state_t::closing
) {
973 messenger
.unregister_conn(seastar::static_pointer_cast
<SocketConnection
>(
974 conn
.shared_from_this()));
981 ceph_assert(state
== state_t::connecting
);
984 state
= state_t::closing
;
987 void ProtocolV1::on_closed()
989 messenger
.closed_conn(
990 seastar::static_pointer_cast
<SocketConnection
>(
991 conn
.shared_from_this()));
994 seastar::future
<> ProtocolV1::fault()
996 if (conn
.policy
.lossy
) {
997 messenger
.unregister_conn(seastar::static_pointer_cast
<SocketConnection
>(
998 conn
.shared_from_this()));
1000 // XXX: we decided not to support lossless connection in v1. as the
1001 // client's default policy is
1002 // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
1003 // lossy. And by the time of crimson-osd's GA, the in-cluster communication
1004 // will all be performed using v2 protocol.
1005 ceph_abort("lossless policy not supported for v1");
1006 return seastar::now();
1009 void ProtocolV1::print(std::ostream
& out
) const
1014 } // namespace crimson::net