]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/ProtocolV2.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
index b7137b8b83b94727c8ad1cae1e19a583b1024fa9..fa47da50832c0956441af3af05033116d66ead35 100644 (file)
@@ -27,7 +27,12 @@ using crimson::common::local_conf;
 
 namespace {
 
-// TODO: apply the same logging policy to Protocol V1
+// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
+const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
+  (CEPH_MSGR2_FEATURE_REVISION_1 |
+   // CEPH_MSGR2_FEATURE_COMPRESSION |
+   UINT64_C(0));
+
 // Log levels in V2 Protocol:
 // * error level, something error that cause connection to terminate:
 //   - fatal errors;
@@ -304,10 +309,9 @@ seastar::future<> ProtocolV2::read_frame_payload()
     logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
     bool ok = false;
     try {
-      rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
       bufferlist rx_epilogue;
       rx_epilogue.append(buffer::create(std::move(bl)));
-      ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+      ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
     } catch (FrameError& e) {
       logger().error("read_frame_payload: {} {}", conn, e.what());
       abort_in_fault();
@@ -396,7 +400,7 @@ ProtocolV2::banner_exchange(bool is_connect)
 {
   // 1. prepare and send banner
   bufferlist banner_payload;
-  encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+  encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
   encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
 
   bufferlist bl;
@@ -407,7 +411,8 @@ ProtocolV2::banner_exchange(bool is_connect)
   logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
                  "required={}, banner=\"{}\"",
                  conn, bl.length(), len_payload,
-                 CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+                 CRIMSON_MSGR2_SUPPORTED_FEATURES,
+                 CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
   INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
   return write_flush(std::move(bl)).then([this] {
@@ -462,7 +467,7 @@ ProtocolV2::banner_exchange(bool is_connect)
                      peer_supported_features, peer_required_features);
 
       // Check feature bit compatibility
-      uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+      uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
       uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
       if ((required_features & peer_supported_features) != required_features) {
         logger().error("{} peer does not support all required features"
@@ -1204,18 +1209,7 @@ ProtocolV2::server_connect()
       throw std::system_error(
           make_error_code(crimson::net::error::bad_peer_address));
     }
-    // TODO: change peer_addr to entity_addrvec_t
-    entity_addr_t paddr = client_ident.addrs().front();
-    if ((paddr.is_msgr2() || paddr.is_any()) &&
-        paddr.is_same_host(conn.target_addr)) {
-      // good
-    } else {
-      logger().warn("{} peer's address {} is not v2 or not the same host with {}",
-                    conn, paddr, conn.target_addr);
-      throw std::system_error(
-          make_error_code(crimson::net::error::bad_peer_address));
-    }
-    conn.peer_addr = paddr;
+    conn.peer_addr = client_ident.addrs().front();
     logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
     conn.target_addr = conn.peer_addr;
     if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
@@ -1474,6 +1468,7 @@ void ProtocolV2::execute_accepting()
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
+          session_comp_handlers = { nullptr, nullptr };
           enable_recording();
           return banner_exchange(false);
         }).then([this] (auto&& ret) {
@@ -1487,8 +1482,9 @@ void ProtocolV2::execute_accepting()
                         conn, ceph_entity_type_name(_peer_type),
                         conn.policy.lossy, conn.policy.server,
                         conn.policy.standby, conn.policy.resetcheck);
-          if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
-              messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+          if (!messenger.get_myaddr().is_blank_ip() &&
+              (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+              messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
             logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
                           conn, _my_addr_from_peer, messenger.get_myaddr());
             throw std::system_error(
@@ -1792,7 +1788,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 // READY state
 
 ceph::bufferlist ProtocolV2::do_sweep_messages(
-    const std::deque<MessageRef>& msgs,
+    const std::deque<MessageURef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
     std::optional<utime_t> _keepalive_ack,
@@ -1812,13 +1808,13 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
   }
 
-  if (require_ack && !num_msgs) {
+  if (require_ack && num_msgs == 0u) {
     auto ack_frame = AckFrame::Encode(conn.in_seq);
     bl.append(ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
 
-  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
     // TODO: move to common code
     // set priority
     msg->get_header().src = messenger.get_myname();
@@ -1834,8 +1830,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     ceph_msg_header2 header2{header.seq,        header.tid,
                              header.type,       header.priority,
                              header.version,
-                             init_le32(0),      header.data_off,
-                             init_le64(conn.in_seq),
+                             ceph_le32(0),      header.data_off,
+                             ceph_le64(conn.in_seq),
                              footer.flags,      header.compat_version,
                              header.reserved};
 
@@ -1873,16 +1869,16 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
                            current_header.type,
                            current_header.priority,
                            current_header.version,
-                           init_le32(msg_frame.front_len()),
-                           init_le32(msg_frame.middle_len()),
-                           init_le32(msg_frame.data_len()),
+                           ceph_le32(msg_frame.front_len()),
+                           ceph_le32(msg_frame.middle_len()),
+                           ceph_le32(msg_frame.data_len()),
                            current_header.data_off,
                            conn.get_peer_name(),
                            current_header.compat_version,
                            current_header.reserved,
-                           init_le32(0)};
-    ceph_msg_footer footer{init_le32(0), init_le32(0),
-                           init_le32(0), init_le64(0), current_header.flags};
+                           ceph_le32(0)};
+    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+                           ceph_le32(0), ceph_le64(0), current_header.flags};
 
     auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this());