]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/ProtocolV2.cc
import 15.2.5
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
index c69f2ccf79ad26cca248f6ed1dd9c7ce2ae351a4..544dc40dad10a0c88bbf77308c105d4fc576ec82 100644 (file)
@@ -24,6 +24,8 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                << " :" << connection->port
                 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
                 << " cs=" << connect_seq << " l=" << connection->policy.lossy
+                << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
+                                                  REVISION_1)
                 << " rx=" << session_stream_handlers.rx.get()
                 << " tx=" << session_stream_handlers.tx.get()
                 << ").";
@@ -81,7 +83,7 @@ if(connection->interceptor) { \
 ProtocolV2::ProtocolV2(AsyncConnection *connection)
     : Protocol(2, connection),
       state(NONE),
-      peer_required_features(0),
+      peer_supported_features(0),
       client_cookie(0),
       server_cookie(0),
       global_seq(0),
@@ -92,6 +94,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
+      tx_frame_asm(&session_stream_handlers, false),
+      rx_frame_asm(&session_stream_handlers, false),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -263,11 +267,11 @@ void ProtocolV2::reset_recv_state() {
 }
 
 size_t ProtocolV2::get_current_msg_size() const {
-  ceph_assert(!rx_segments_desc.empty());
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
   size_t sum = 0;
   // we don't include SegmentIndex::Msg::HEADER.
-  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
-    sum += rx_segments_desc[idx].length;
+  for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
+    sum += rx_frame_asm.get_segment_logical_len(i);
   }
   return sum;
 }
@@ -573,11 +577,14 @@ template <class F>
 bool ProtocolV2::append_frame(F& frame) {
   ceph::bufferlist bl;
   try {
-    bl = frame.get_buffer(session_stream_handlers);
+    bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return false;
   }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   connection->outgoing_bl.append(bl);
   return true;
 }
@@ -732,26 +739,6 @@ bool ProtocolV2::is_queued() {
   return !out_queue.empty() || connection->is_queued();
 }
 
-uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
-  if (session_stream_handlers.rx) {
-    return segment_onwire_size(logical_size);
-  } else {
-    return logical_size;
-  }
-}
-
-uint32_t ProtocolV2::get_epilogue_size() const {
-  // In secure mode size of epilogue is flexible and depends on particular
-  // cipher implementation. See the comment for epilogue_secure_block_t or
-  // epilogue_plain_block_t.
-  if (session_stream_handlers.rx) {
-    return FRAME_SECURE_EPILOGUE_SIZE + \
-        session_stream_handlers.rx->get_extra_size_at_final();
-  } else {
-    return FRAME_PLAIN_EPILOGUE_SIZE;
-  }
-}
-
 CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
                        rx_buffer_t &&buffer) {
   const auto len = buffer->length();
@@ -787,11 +774,14 @@ CtPtr ProtocolV2::write(const std::string &desc,
                         F &frame) {
   ceph::bufferlist bl;
   try {
-    bl = frame.get_buffer(session_stream_handlers);
+    bl = frame.get_buffer(tx_frame_asm);
   } catch (ceph::crypto::onwire::TxHandlerError &e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return _fault();
   }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   return write(desc, next, bl);
 }
 
@@ -939,13 +929,15 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
     return nullptr;
   }
 
-  this->peer_required_features = peer_required_features;
-  if (this->peer_required_features == 0) {
+  this->peer_supported_features = peer_supported_features;
+  if (peer_required_features == 0) {
     this->connection_features = msgr2_required;
   }
 
-  // at this point we can change how the client protocol behaves based on
-  // this->peer_required_features
+  // if the peer supports msgr2.1, switch to it
+  bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+  tx_frame_asm.set_is_rev1(is_rev1);
+  rx_frame_asm.set_is_rev1(is_rev1);
 
   if (state == BANNER_CONNECTING) {
     state = HELLO_CONNECTING;
@@ -1061,86 +1053,48 @@ CtPtr ProtocolV2::read_frame() {
   }
 
   ldout(cct, 20) << __func__ << dendl;
-  return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
+  rx_preamble.clear();
+  rx_epilogue.clear();
+  rx_segments_data.clear();
+
+  return READ(rx_frame_asm.get_preamble_onwire_len(),
+              handle_read_frame_preamble_main);
 }
 
 CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
+    ldout(cct, 1) << __func__ << " read frame preamble failed r=" << r
                   << " (" << cpp_strerror(r) << ")" << dendl;
     return _fault();
   }
 
-  ceph::bufferlist preamble;
-  preamble.push_back(std::move(buffer));
+  rx_preamble.push_back(std::move(buffer));
 
   ldout(cct, 30) << __func__ << " preamble\n";
-  preamble.hexdump(*_dout);
+  rx_preamble.hexdump(*_dout);
   *_dout << dendl;
 
-  if (session_stream_handlers.rx) {
-    ceph_assert(session_stream_handlers.rx);
-
-    session_stream_handlers.rx->reset_rx_handler();
-    preamble = session_stream_handlers.rx->authenticated_decrypt_update(
-      std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
+  try {
+    next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+  } catch (FrameError& e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  } catch (ceph::crypto::onwire::MsgAuthError&) {
+    ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+    return _fault();
+  }
 
-    ldout(cct, 10) << __func__ << " got encrypted preamble."
-                   << " after decrypt premable.length()=" << preamble.length()
-                   << dendl;
+  ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
+                 << dendl;
 
+  if (session_stream_handlers.rx) {
     ldout(cct, 30) << __func__ << " preamble after decrypt\n";
-    preamble.hexdump(*_dout);
+    rx_preamble.hexdump(*_dout);
     *_dout << dendl;
   }
 
-  {
-    // I expect ceph_le32 will make the endian conversion for me. Passing
-    // everything through ::Decode is unnecessary.
-    const auto& main_preamble = \
-      reinterpret_cast<preamble_block_t&>(*preamble.c_str());
-
-    // verify preamble's CRC before any further processing
-    const auto rx_crc = ceph_crc32c(0,
-      reinterpret_cast<const unsigned char*>(&main_preamble),
-      sizeof(main_preamble) - sizeof(main_preamble.crc));
-    if (rx_crc != main_preamble.crc) {
-      ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
-                    << " rx_crc=" << rx_crc
-                    << " tx_crc=" << main_preamble.crc << dendl;
-      return _fault();
-    }
-
-    // currently we do support between 1 and MAX_NUM_SEGMENTS segments
-    if (main_preamble.num_segments < 1 ||
-        main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-      ldout(cct, 10) << __func__ << " unsupported num_segments="
-                    << " tx_crc=" << main_preamble.num_segments << dendl;
-      return _fault();
-    }
-
-    next_tag = static_cast<Tag>(main_preamble.tag);
-
-    rx_segments_desc.clear();
-    rx_segments_data.clear();
-
-    if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-      ldout(cct, 30) << __func__
-                    << " num_segments=" << main_preamble.num_segments
-                    << " is too much" << dendl;
-      return _fault();
-    }
-    for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
-      ldout(cct, 10) << __func__ << " got new segment:"
-                    << " len=" << main_preamble.segments[idx].length
-                    << " align=" << main_preamble.segments[idx].alignment
-                    << dendl;
-      rx_segments_desc.emplace_back(main_preamble.segments[idx]);
-    }
-  }
-
   // does it need throttle?
   if (next_tag == Tag::MESSAGE) {
     if (state != READY) {
@@ -1193,21 +1147,26 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
 }
 
 CtPtr ProtocolV2::read_frame_segment() {
-  ldout(cct, 20) << __func__ << dendl;
-  ceph_assert(!rx_segments_desc.empty());
+  size_t seg_idx = rx_segments_data.size();
+  ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
+  rx_segments_data.emplace_back();
+
+  uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+  if (onwire_len == 0) {
+    return _handle_read_frame_segment();
+  }
 
-  // description of current segment to read
-  const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
   rx_buffer_t rx_buffer;
+  uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
   try {
     rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
-      get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
+        onwire_len, align));
   } catch (std::bad_alloc&) {
     // Catching because of potential issues with satisfying alignment.
-    ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
-                  << " len=" << get_onwire_size(cur_rx_desc.length)
-                  << " align=" << cur_rx_desc.alignment
-                  << dendl;
+    ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
+                  << " len=" << onwire_len
+                  << " align=" << align
+                  << dendl;
     return _fault();
   }
 
@@ -1223,35 +1182,21 @@ CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
     return _fault();
   }
 
-  rx_segments_data.emplace_back();
   rx_segments_data.back().push_back(std::move(rx_buffer));
+  return _handle_read_frame_segment();
+}
 
-  // decrypt incoming data
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  if (session_stream_handlers.rx) {
-    ceph_assert(session_stream_handlers.rx);
-
-    auto& new_seg = rx_segments_data.back();
-    if (new_seg.length()) {
-      auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
-          std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
-      const auto idx = rx_segments_data.size() - 1;
-      new_seg.clear();
-      padded.splice(0, rx_segments_desc[idx].length, &new_seg);
-
-      ldout(cct, 20) << __func__
-                     << " unpadded new_seg.length()=" << new_seg.length()
-                     << dendl;
-    }
-  }
-
-  if (rx_segments_desc.size() == rx_segments_data.size()) {
+CtPtr ProtocolV2::_handle_read_frame_segment() {
+  if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
     // OK, all segments planned to read are read. Can go with epilogue.
-    return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
-  } else {
-    // TODO: for makeshift only. This will be more generic and throttled
-    return read_frame_segment();
+    uint32_t epilogue_onwire_len = rx_frame_asm.get_epilogue_onwire_len();
+    if (epilogue_onwire_len == 0) {
+      return _handle_read_frame_epilogue_main();
+    }
+    return READ(epilogue_onwire_len, handle_read_frame_epilogue_main);
   }
+  // TODO: for makeshift only. This will be more generic and throttled
+  return read_frame_segment();
 }
 
 CtPtr ProtocolV2::handle_frame_payload() {
@@ -1346,66 +1291,38 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " read data error " << dendl;
+    ldout(cct, 1) << __func__ << " read frame epilogue failed r=" << r
+                  << " (" << cpp_strerror(r) << ")" << dendl;
     return _fault();
   }
 
-  __u8 late_flags;
+  rx_epilogue.push_back(std::move(buffer));
+  return _handle_read_frame_epilogue_main();
+}
 
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  if (session_stream_handlers.rx) {
-    ldout(cct, 1) << __func__ << " read frame epilogue bytes="
-                  << get_epilogue_size() << dendl;
-
-    // decrypt epilogue and authenticate entire frame.
-    ceph::bufferlist epilogue_bl;
-    {
-      epilogue_bl.push_back(std::move(buffer));
-      try {
-        epilogue_bl =
-            session_stream_handlers.rx->authenticated_decrypt_update_final(
-               std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
-      } catch (ceph::crypto::onwire::MsgAuthError &e) {
-        ldout(cct, 5) << __func__ << " message authentication failed: "
-                      << e.what() << dendl;
-        return _fault();
-      }
-    }
-    auto& epilogue =
-        reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
-    late_flags = epilogue.late_flags;
-  } else {
-    auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
-
-    for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
-      const __u32 expected_crc = epilogue.crc_values[idx];
-      const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
-      if (expected_crc != calculated_crc) {
-       ldout(cct, 5) << __func__ << " message integrity check failed: "
-                     << " expected_crc=" << expected_crc
-                     << " calculated_crc=" << calculated_crc
-                     << dendl;
-       return _fault();
-      } else {
-       ldout(cct, 20) << __func__ << " message integrity check success: "
-                      << " expected_crc=" << expected_crc
-                      << " calculated_crc=" << calculated_crc
-                      << dendl;
-      }
-    }
-    late_flags = epilogue.late_flags;
+CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
+  bool aborted;
+  try {
+    rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
+    aborted = !rx_frame_asm.disassemble_remaining_segments(
+        rx_segments_data.data(), rx_epilogue);
+  } catch (FrameError& e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  } catch (ceph::crypto::onwire::MsgAuthError&) {
+    ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+    return _fault();
   }
 
   // we do have a mechanism that allows transmitter to start sending message
   // and abort after putting entire data field on wire. This will be used by
   // the kernel client to avoid unnecessary buffering.
-  if (late_flags & FRAME_FLAGS_LATEABRT) {
+  if (aborted) {
     reset_throttle();
     state = READY;
     return CONTINUE(read_frame);
-  } else {
-    return handle_read_frame_dispatch();
   }
+  return handle_read_frame_dispatch();
 }
 
 CtPtr ProtocolV2::handle_message() {
@@ -1417,9 +1334,8 @@ CtPtr ProtocolV2::handle_message() {
 #endif
   recv_stamp = ceph_clock_now();
 
-  // we need to get the size before std::moving segments data
   const size_t cur_msg_size = get_current_msg_size();
-  auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+  auto msg_frame = MessageFrame::Decode(rx_segments_data);
 
   // XXX: paranoid copy just to avoid oops
   ceph_msg_header2 current_header = msg_frame.header();
@@ -1541,9 +1457,8 @@ CtPtr ProtocolV2::handle_message() {
   }
 
   connection->logger->inc(l_msgr_recv_messages);
-  connection->logger->inc(
-      l_msgr_recv_bytes,
-      cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+  connection->logger->inc(l_msgr_recv_bytes,
+                          rx_frame_asm.get_frame_onwire_len());
 
   messenger->ms_fast_preprocess(message);
   fast_dispatch_time = ceph::mono_clock::now();
@@ -1895,8 +1810,9 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
     return _fault();
   }
   auth_meta->con_mode = auth_done.con_mode();
-  session_stream_handlers = \
-    ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
+  bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+  session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+      cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/false);
 
   state = AUTH_CONNECTING_SIGN;
 
@@ -2131,8 +2047,8 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
                 << " features_supported=" << std::hex
                 << server_ident.supported_features()
                 << " features_required=" << server_ident.required_features()
-                << " flags=" << server_ident.flags() << " cookie=" << std::dec
-                << server_ident.cookie() << dendl;
+                << " flags=" << server_ident.flags()
+                << " cookie=" << server_ident.cookie() << std::dec << dendl;
 
   // is this who we intended to talk to?
   // be a bit forgiving here, since we may be connecting based on addresses parsed out
@@ -2275,8 +2191,9 @@ CtPtr ProtocolV2::finish_auth()
   ceph_assert(auth_meta);
   // TODO: having a possibility to check whether we're server or client could
   // allow reusing finish_auth().
-  session_stream_handlers = \
-    ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
+  bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+  session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+      cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/true);
 
   const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
     auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
@@ -2725,6 +2642,10 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   exproto->pre_auth.enabled = false;
 
   if (!reconnecting) {
+    exproto->peer_supported_features = peer_supported_features;
+    exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
+    exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
+
     exproto->client_cookie = client_cookie;
     exproto->peer_name = peer_name;
     exproto->connection_features = connection_features;
@@ -2877,8 +2798,8 @@ CtPtr ProtocolV2::send_server_ident() {
                 << connection->policy.features_supported
                 << " features_required="
                            << (connection->policy.features_required | msgr2_required)
-                << " flags=" << flags << " cookie=" << std::dec << server_cookie
-                << dendl;
+                << " flags=" << flags
+                << " cookie=" << server_cookie << std::dec << dendl;
 
   connection->lock.unlock();
   // Because "replacing" will prevent other connections preempt this addr,