]>
git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/FrameAssemblerV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "FrameAssemblerV2.h"
7 #include "SocketConnection.h"
9 #ifdef UNIT_TESTS_BUILT
10 #include "Interceptor.h"
13 using ceph::msgr::v2::FrameAssembler
;
14 using ceph::msgr::v2::FrameError
;
15 using ceph::msgr::v2::preamble_block_t
;
16 using ceph::msgr::v2::segment_t
;
17 using ceph::msgr::v2::Tag
;
21 seastar::logger
& logger() {
22 return crimson::get_logger(ceph_subsys_ms
);
25 } // namespace anonymous
27 namespace crimson::net
{
29 FrameAssemblerV2::FrameAssemblerV2(SocketConnection
&_conn
)
33 #ifdef UNIT_TESTS_BUILT
34 // should be consistent to intercept() in ProtocolV2.cc
35 void FrameAssemblerV2::intercept_frame(Tag tag
, bool is_write
)
38 if (conn
.interceptor
) {
39 auto type
= is_write
? bp_type_t::WRITE
: bp_type_t::READ
;
40 auto action
= conn
.interceptor
->intercept(
41 conn
, Breakpoint
{tag
, type
});
42 socket
->set_trap(type
, action
, &conn
.interceptor
->blocker
);
47 void FrameAssemblerV2::set_is_rev1(bool _is_rev1
)
50 tx_frame_asm
.set_is_rev1(_is_rev1
);
51 rx_frame_asm
.set_is_rev1(_is_rev1
);
54 void FrameAssemblerV2::create_session_stream_handlers(
55 const AuthConnectionMeta
&auth_meta
,
58 session_stream_handlers
= ceph::crypto::onwire::rxtx_t::create_handler_pair(
59 nullptr, auth_meta
, is_rev1
, crossed
);
62 void FrameAssemblerV2::reset_handlers()
64 session_stream_handlers
= { nullptr, nullptr };
65 session_comp_handlers
= { nullptr, nullptr };
68 FrameAssemblerV2::mover_t
69 FrameAssemblerV2::to_replace()
71 assert(is_socket_valid());
74 std::move(conn
.socket
),
75 std::move(session_stream_handlers
),
76 std::move(session_comp_handlers
)};
79 seastar::future
<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t
&&mover
)
84 session_stream_handlers
= std::move(mover
.session_stream_handlers
);
85 session_comp_handlers
= std::move(mover
.session_comp_handlers
);
87 return replace_shutdown_socket(std::move(mover
.socket
));
89 set_socket(std::move(mover
.socket
));
90 return seastar::now();
94 void FrameAssemblerV2::start_recording()
101 FrameAssemblerV2::record_bufs_t
102 FrameAssemblerV2::stop_recording()
104 ceph_assert_always(record_io
== true);
106 return record_bufs_t
{std::move(rxbuf
), std::move(txbuf
)};
109 bool FrameAssemblerV2::has_socket() const
111 assert((socket
&& conn
.socket
) || (!socket
&& !conn
.socket
));
112 return socket
!= nullptr;
115 bool FrameAssemblerV2::is_socket_valid() const
117 return has_socket() && !socket
->is_shutdown();
120 void FrameAssemblerV2::set_socket(SocketRef
&&new_socket
)
122 assert(!has_socket());
123 socket
= new_socket
.get();
124 conn
.socket
= std::move(new_socket
);
125 assert(is_socket_valid());
128 void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port
)
130 assert(has_socket());
131 socket
->learn_ephemeral_port_as_connector(port
);
134 void FrameAssemblerV2::shutdown_socket()
136 assert(is_socket_valid());
140 seastar::future
<> FrameAssemblerV2::replace_shutdown_socket(SocketRef
&&new_socket
)
142 assert(has_socket());
143 assert(socket
->is_shutdown());
145 auto old_socket
= std::move(conn
.socket
);
146 set_socket(std::move(new_socket
));
147 return old_socket
->close(
148 ).then([sock
= std::move(old_socket
)] {});
151 seastar::future
<> FrameAssemblerV2::close_shutdown_socket()
153 assert(has_socket());
154 assert(socket
->is_shutdown());
155 return socket
->close();
158 seastar::future
<Socket::tmp_buf
>
159 FrameAssemblerV2::read_exactly(std::size_t bytes
)
161 assert(has_socket());
162 if (unlikely(record_io
)) {
163 return socket
->read_exactly(bytes
164 ).then([this](auto bl
) {
165 rxbuf
.append(buffer::create(bl
.share()));
169 return socket
->read_exactly(bytes
);
173 seastar::future
<ceph::bufferlist
>
174 FrameAssemblerV2::read(std::size_t bytes
)
176 assert(has_socket());
177 if (unlikely(record_io
)) {
178 return socket
->read(bytes
179 ).then([this](auto buf
) {
184 return socket
->read(bytes
);
189 FrameAssemblerV2::write(ceph::bufferlist
&&buf
)
191 assert(has_socket());
192 if (unlikely(record_io
)) {
195 return socket
->write(std::move(buf
));
199 FrameAssemblerV2::flush()
201 assert(has_socket());
202 return socket
->flush();
206 FrameAssemblerV2::write_flush(ceph::bufferlist
&&buf
)
208 assert(has_socket());
209 if (unlikely(record_io
)) {
212 return socket
->write_flush(std::move(buf
));
215 seastar::future
<FrameAssemblerV2::read_main_t
>
216 FrameAssemblerV2::read_main_preamble()
219 return read_exactly(rx_frame_asm
.get_preamble_onwire_len()
220 ).then([this](auto bl
) {
222 rx_preamble
.append(buffer::create(std::move(bl
)));
223 const Tag tag
= rx_frame_asm
.disassemble_preamble(rx_preamble
);
224 #ifdef UNIT_TESTS_BUILT
225 intercept_frame(tag
, false);
227 return read_main_t
{tag
, &rx_frame_asm
};
228 } catch (FrameError
& e
) {
229 logger().warn("{} read_main_preamble: {}", conn
, e
.what());
230 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
235 seastar::future
<FrameAssemblerV2::read_payload_t
*>
236 FrameAssemblerV2::read_frame_payload()
238 rx_segments_data
.clear();
239 return seastar::do_until(
241 return rx_frame_asm
.get_num_segments() == rx_segments_data
.size();
244 // TODO: create aligned and contiguous buffer from socket
245 const size_t seg_idx
= rx_segments_data
.size();
246 if (uint16_t alignment
= rx_frame_asm
.get_segment_align(seg_idx
);
247 alignment
!= segment_t::DEFAULT_ALIGNMENT
) {
248 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
249 conn
, alignment
, rx_segments_data
.size());
251 uint32_t onwire_len
= rx_frame_asm
.get_segment_onwire_len(seg_idx
);
252 // TODO: create aligned and contiguous buffer from socket
253 return read_exactly(onwire_len
254 ).then([this](auto tmp_bl
) {
255 logger().trace("{} RECV({}) frame segment[{}]",
256 conn
, tmp_bl
.size(), rx_segments_data
.size());
258 segment
.append(buffer::create(std::move(tmp_bl
)));
259 rx_segments_data
.emplace_back(std::move(segment
));
263 return read_exactly(rx_frame_asm
.get_epilogue_onwire_len());
264 }).then([this](auto bl
) {
265 logger().trace("{} RECV({}) frame epilogue", conn
, bl
.size());
268 bufferlist rx_epilogue
;
269 rx_epilogue
.append(buffer::create(std::move(bl
)));
270 ok
= rx_frame_asm
.disassemble_segments(rx_preamble
, rx_segments_data
.data(), rx_epilogue
);
271 } catch (FrameError
& e
) {
272 logger().error("read_frame_payload: {} {}", conn
, e
.what());
273 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
274 } catch (ceph::crypto::onwire::MsgAuthError
&) {
275 logger().error("read_frame_payload: {} bad auth tag", conn
);
276 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
278 // we do have a mechanism that allows transmitter to start sending message
279 // and abort after putting entire data field on wire. This will be used by
280 // the kernel client to avoid unnecessary buffering.
284 return &rx_segments_data
;
288 void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist
&bl
)
290 const auto main_preamble
=
291 reinterpret_cast<const preamble_block_t
*>(bl
.front().c_str());
292 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
293 conn
, bl
.length(), (int)main_preamble
->tag
,
294 (int)main_preamble
->num_segments
, main_preamble
->crc
);
297 FrameAssemblerV2Ref
FrameAssemblerV2::create(SocketConnection
&conn
)
299 return std::make_unique
<FrameAssemblerV2
>(conn
);
302 } // namespace crimson::net