]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/ProtocolV2.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
index c9b2d55e1d825c5f3116ae574c1b2d92a2c4cc0f..a176fc2c808ac9eb8d465310781daf4eacc20b98 100644 (file)
@@ -26,8 +26,10 @@ std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                 << " cs=" << connect_seq << " l=" << connection->policy.lossy
                 << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
                                                   REVISION_1)
-                << " rx=" << session_stream_handlers.rx.get()
+                << " crypto rx=" << session_stream_handlers.rx.get()
                 << " tx=" << session_stream_handlers.tx.get()
+                << " comp rx=" << session_compression_handlers.rx.get()
+                << " tx=" << session_compression_handlers.tx.get()
                 << ").";
 }
 
@@ -94,8 +96,10 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
-      tx_frame_asm(&session_stream_handlers, false),
-      rx_frame_asm(&session_stream_handlers, false),
+      tx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+                   &session_compression_handlers),
+      rx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+                   &session_compression_handlers),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -246,15 +250,17 @@ void ProtocolV2::reset_recv_state() {
     // `write_event()` unlocks it just before calling `write_message()`.
     // `submit_to()` here is NOT blocking.
     connection->center->submit_to(connection->center->get_id(), [this] {
-      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
                     << dendl;
       // Possibly unnecessary. See the comment in `deactivate_existing`.
       std::lock_guard<std::mutex> l(connection->lock);
       std::lock_guard<std::mutex> wl(connection->write_lock);
       reset_security();
+      reset_compression();
     }, /* always_async = */true);
   } else {
     reset_security();
+    reset_compression();
   }
 
   // clean read and write callbacks
@@ -527,8 +533,8 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
   ceph_msg_header2 header2{header.seq,        header.tid,
                            header.type,       header.priority,
                            header.version,
-                           init_le32(0),      header.data_off,
-                           init_le64(ack_seq),
+                           ceph_le32(0),      header.data_off,
+                           ceph_le64(ack_seq),
                            footer.flags,      header.compat_version,
                            header.reserved};
 
@@ -585,7 +591,7 @@ bool ProtocolV2::append_frame(F& frame) {
 
   ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
                  << " bytes " << tx_frame_asm << dendl;
-  connection->outgoing_bl.append(bl);
+  connection->outgoing_bl.claim_append(bl);
   return true;
 }
 
@@ -617,6 +623,14 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
   }
 }
 
+void ProtocolV2::reset_compression() {
+  ldout(cct, 5) << __func__ << dendl;
+
+  comp_meta = CompConnectionMeta{};
+  session_compression_handlers.rx.reset(nullptr);
+  session_compression_handlers.tx.reset(nullptr);
+}
+
 void ProtocolV2::write_event() {
   ldout(cct, 10) << __func__ << dendl;
   ssize_t r = 0;
@@ -1104,6 +1118,7 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
       lderr(cct) << __func__ << " not in ready state!" << dendl;
       return _fault();
     }
+    recv_stamp = ceph_clock_now();
     state = THROTTLE_MESSAGE;
     return CONTINUE(throttle_message);
   } else {
@@ -1135,6 +1150,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
     case Tag::WAIT:
+    case Tag::COMPRESSION_REQUEST:
+    case Tag::COMPRESSION_DONE:
       return handle_frame_payload();
     case Tag::MESSAGE:
       return handle_message();
@@ -1249,6 +1266,10 @@ CtPtr ProtocolV2::handle_frame_payload() {
       return handle_message_ack(payload);
     case Tag::WAIT:
       return handle_wait(payload);
+    case Tag::COMPRESSION_REQUEST:
+      return handle_compression_request(payload);
+    case Tag::COMPRESSION_DONE:
+      return handle_compression_done(payload);
     default:
       ceph_abort();
   }
@@ -1304,11 +1325,9 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
 }
 
 CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
-  bool aborted;
+  bool ok = false;
   try {
-    rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
-    aborted = !rx_frame_asm.disassemble_remaining_segments(
-        rx_segments_data.data(), rx_epilogue);
+    ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
   } catch (FrameError& e) {
     ldout(cct, 1) << __func__ << " " << e.what() << dendl;
     return _fault();
@@ -1320,7 +1339,7 @@ CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
   // we do have a mechanism that allows transmitter to start sending message
   // and abort after putting entire data field on wire. This will be used by
   // the kernel client to avoid unnecessary buffering.
-  if (aborted) {
+  if (!ok) {
     reset_throttle();
     state = READY;
     return CONTINUE(read_frame);
@@ -1332,11 +1351,6 @@ CtPtr ProtocolV2::handle_message() {
   ldout(cct, 20) << __func__ << dendl;
   ceph_assert(state == THROTTLE_DONE);
 
-#if defined(WITH_EVENTTRACE)
-  utime_t ltt_recv_stamp = ceph_clock_now();
-#endif
-  recv_stamp = ceph_clock_now();
-
   const size_t cur_msg_size = get_current_msg_size();
   auto msg_frame = MessageFrame::Decode(rx_segments_data);
 
@@ -1359,16 +1373,16 @@ CtPtr ProtocolV2::handle_message() {
                          current_header.type,
                          current_header.priority,
                          current_header.version,
-                         init_le32(msg_frame.front_len()),
-                         init_le32(msg_frame.middle_len()),
-                         init_le32(msg_frame.data_len()),
+                         ceph_le32(msg_frame.front_len()),
+                         ceph_le32(msg_frame.middle_len()),
+                         ceph_le32(msg_frame.data_len()),
                          current_header.data_off,
                          peer_name,
                          current_header.compat_version,
                          current_header.reserved,
-                         init_le32(0)};
-  ceph_msg_footer footer{init_le32(0), init_le32(0),
-                        init_le32(0), init_le64(0), current_header.flags};
+                         ceph_le32(0)};
+  ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+                        ceph_le32(0), ceph_le64(0), current_header.flags};
 
   Message *message = decode_message(cct, 0, header, footer,
       msg_frame.front(),
@@ -1425,7 +1439,7 @@ CtPtr ProtocolV2::handle_message() {
       message->get_type() == CEPH_MSG_OSD_OPREPLY) {
     utime_t ltt_processed_stamp = ceph_clock_now();
     double usecs_elapsed =
-        (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
+      ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
     ostringstream buf;
     if (message->get_type() == CEPH_MSG_OSD_OP)
       OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
@@ -1828,6 +1842,29 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
 }
 
 CtPtr ProtocolV2::finish_client_auth() {
+  if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+    return send_compression_request();
+  }
+
+  return start_session_connect();
+}
+
+CtPtr ProtocolV2::finish_server_auth() {
+  // server had sent AuthDone and client responded with correct pre-auth
+  // signature. 
+  // We can start conditioanl msgr protocol
+  if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+    state = COMPRESSION_ACCEPTING;
+  } else {
+    // No msgr protocol features to process
+    // we can start accepting new sessions/reconnects.
+    state = SESSION_ACCEPTING;
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::start_session_connect() {
   if (!server_cookie) {
     ceph_assert(connect_seq == 0);
     state = SESSION_CONNECTING;
@@ -2087,6 +2124,40 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
   return ready();
 }
 
+CtPtr ProtocolV2::send_compression_request() {
+  state = COMPRESSION_CONNECTING;
+
+  const entity_type_t peer_type = connection->get_peer_type();
+  comp_meta.con_mode =
+    static_cast<Compressor::CompressionMode>(
+      messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
+  const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
+  auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
+
+  INTERCEPT(19);
+  return WRITE(comp_req_frame, "compression request", read_frame);
+}
+
+CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
+  if (state != COMPRESSION_CONNECTING) {
+    lderr(cct) << __func__ << " state changed!" << dendl;
+    return _fault();
+  }
+
+  auto response = CompressionDoneFrame::Decode(payload);
+  ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
+                << ", method=" << response.method() << ")" << dendl;
+
+  comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
+  if (comp_meta.is_compress() != response.is_compress()) {
+    comp_meta.con_mode = Compressor::COMP_NONE;
+  }
+  session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+    cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+  return start_session_connect();
+}
+
 /* Server Protocol Methods */
 
 CtPtr ProtocolV2::start_server_banner_exchange() {
@@ -2250,10 +2321,8 @@ CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
   }
 
   if (state == AUTH_ACCEPTING_SIGN) {
-    // server had sent AuthDone and client responded with correct pre-auth
-    // signature. we can start accepting new sessions/reconnects.
-    state = SESSION_ACCEPTING;
-    return CONTINUE(read_frame);
+    // this happened on server side
+    return finish_server_auth();
   } else if (state == AUTH_CONNECTING_SIGN) {
     // this happened at client side
     return finish_client_auth();
@@ -2664,7 +2733,9 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   // this happens in the event center's thread as there should be
   // no user outside its boundaries (simlarly to e.g. outgoing_bl).
   auto temp_stream_handlers = std::move(session_stream_handlers);
+  auto temp_compression_handlers = std::move(session_compression_handlers);
   exproto->auth_meta = auth_meta;
+  exproto->comp_meta = comp_meta;
 
   ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
                            << dendl;
@@ -2693,7 +2764,8 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
         new_worker,
         new_center,
         exproto,
-        temp_stream_handlers=std::move(temp_stream_handlers)
+        temp_stream_handlers=std::move(temp_stream_handlers),
+        temp_compression_handlers=std::move(temp_compression_handlers)
       ](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
@@ -2708,6 +2780,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
           existing->outgoing_bl.clear();
           existing->open_write = false;
           exproto->session_stream_handlers = std::move(temp_stream_handlers);
+          exproto->session_compression_handlers = std::move(temp_compression_handlers);
           existing->write_lock.unlock();
           if (exproto->state == NONE) {
             existing->shutdown_socket();
@@ -2895,3 +2968,43 @@ CtPtr ProtocolV2::send_reconnect_ok() {
 
   return WRITE(reconnect_ok, "reconnect ok", server_ready);
 }
+
+
+CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
+  if (state != COMPRESSION_ACCEPTING) {
+    lderr(cct) << __func__ << " state changed!" << dendl;
+    return _fault();
+  }
+
+  auto request = CompressionRequestFrame::Decode(payload);
+  ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
+                << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
+
+  const int peer_type = connection->get_peer_type();
+  if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
+        peer_type, auth_meta->is_mode_secure());
+      mode != Compressor::COMP_NONE && request.is_compress()) {
+    comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
+    if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
+      comp_meta.con_mode = mode;
+    }
+  } else {
+    comp_meta.con_method = Compressor::COMP_ALG_NONE;
+  }
+  
+  auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
+
+  INTERCEPT(20);
+  return WRITE(response, "compression done", finish_compression);
+}
+
+CtPtr ProtocolV2::finish_compression() {
+  // TODO: having a possibility to check whether we're server or client could
+  // allow reusing finish_compression().
+  
+  session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+    cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+  state = SESSION_ACCEPTING;
+  return CONTINUE(read_frame);
+}