]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/FrameAssemblerV2.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / net / FrameAssemblerV2.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "FrameAssemblerV2.h"
5
6 #include "Errors.h"
7 #include "SocketConnection.h"
8
9 #ifdef UNIT_TESTS_BUILT
10 #include "Interceptor.h"
11 #endif
12
13 using ceph::msgr::v2::FrameAssembler;
14 using ceph::msgr::v2::FrameError;
15 using ceph::msgr::v2::preamble_block_t;
16 using ceph::msgr::v2::segment_t;
17 using ceph::msgr::v2::Tag;
18
19 namespace {
20
21 seastar::logger& logger() {
22 return crimson::get_logger(ceph_subsys_ms);
23 }
24
25 } // namespace anonymous
26
27 namespace crimson::net {
28
29 FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
30 : conn{_conn}
31 {}
32
33 #ifdef UNIT_TESTS_BUILT
34 // should be consistent to intercept() in ProtocolV2.cc
35 void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
36 {
37 assert(has_socket());
38 if (conn.interceptor) {
39 auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
40 auto action = conn.interceptor->intercept(
41 conn, Breakpoint{tag, type});
42 socket->set_trap(type, action, &conn.interceptor->blocker);
43 }
44 }
45 #endif
46
47 void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
48 {
49 is_rev1 = _is_rev1;
50 tx_frame_asm.set_is_rev1(_is_rev1);
51 rx_frame_asm.set_is_rev1(_is_rev1);
52 }
53
54 void FrameAssemblerV2::create_session_stream_handlers(
55 const AuthConnectionMeta &auth_meta,
56 bool crossed)
57 {
58 session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
59 nullptr, auth_meta, is_rev1, crossed);
60 }
61
62 void FrameAssemblerV2::reset_handlers()
63 {
64 session_stream_handlers = { nullptr, nullptr };
65 session_comp_handlers = { nullptr, nullptr };
66 }
67
68 FrameAssemblerV2::mover_t
69 FrameAssemblerV2::to_replace()
70 {
71 assert(is_socket_valid());
72 socket = nullptr;
73 return mover_t{
74 std::move(conn.socket),
75 std::move(session_stream_handlers),
76 std::move(session_comp_handlers)};
77 }
78
79 seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
80 {
81 record_io = false;
82 rxbuf.clear();
83 txbuf.clear();
84 session_stream_handlers = std::move(mover.session_stream_handlers);
85 session_comp_handlers = std::move(mover.session_comp_handlers);
86 if (has_socket()) {
87 return replace_shutdown_socket(std::move(mover.socket));
88 } else {
89 set_socket(std::move(mover.socket));
90 return seastar::now();
91 }
92 }
93
94 void FrameAssemblerV2::start_recording()
95 {
96 record_io = true;
97 rxbuf.clear();
98 txbuf.clear();
99 }
100
101 FrameAssemblerV2::record_bufs_t
102 FrameAssemblerV2::stop_recording()
103 {
104 ceph_assert_always(record_io == true);
105 record_io = false;
106 return record_bufs_t{std::move(rxbuf), std::move(txbuf)};
107 }
108
109 bool FrameAssemblerV2::has_socket() const
110 {
111 assert((socket && conn.socket) || (!socket && !conn.socket));
112 return socket != nullptr;
113 }
114
115 bool FrameAssemblerV2::is_socket_valid() const
116 {
117 return has_socket() && !socket->is_shutdown();
118 }
119
120 void FrameAssemblerV2::set_socket(SocketRef &&new_socket)
121 {
122 assert(!has_socket());
123 socket = new_socket.get();
124 conn.socket = std::move(new_socket);
125 assert(is_socket_valid());
126 }
127
128 void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
129 {
130 assert(has_socket());
131 socket->learn_ephemeral_port_as_connector(port);
132 }
133
134 void FrameAssemblerV2::shutdown_socket()
135 {
136 assert(is_socket_valid());
137 socket->shutdown();
138 }
139
140 seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket)
141 {
142 assert(has_socket());
143 assert(socket->is_shutdown());
144 socket = nullptr;
145 auto old_socket = std::move(conn.socket);
146 set_socket(std::move(new_socket));
147 return old_socket->close(
148 ).then([sock = std::move(old_socket)] {});
149 }
150
151 seastar::future<> FrameAssemblerV2::close_shutdown_socket()
152 {
153 assert(has_socket());
154 assert(socket->is_shutdown());
155 return socket->close();
156 }
157
158 seastar::future<Socket::tmp_buf>
159 FrameAssemblerV2::read_exactly(std::size_t bytes)
160 {
161 assert(has_socket());
162 if (unlikely(record_io)) {
163 return socket->read_exactly(bytes
164 ).then([this](auto bl) {
165 rxbuf.append(buffer::create(bl.share()));
166 return bl;
167 });
168 } else {
169 return socket->read_exactly(bytes);
170 };
171 }
172
173 seastar::future<ceph::bufferlist>
174 FrameAssemblerV2::read(std::size_t bytes)
175 {
176 assert(has_socket());
177 if (unlikely(record_io)) {
178 return socket->read(bytes
179 ).then([this](auto buf) {
180 rxbuf.append(buf);
181 return buf;
182 });
183 } else {
184 return socket->read(bytes);
185 }
186 }
187
188 seastar::future<>
189 FrameAssemblerV2::write(ceph::bufferlist &&buf)
190 {
191 assert(has_socket());
192 if (unlikely(record_io)) {
193 txbuf.append(buf);
194 }
195 return socket->write(std::move(buf));
196 }
197
198 seastar::future<>
199 FrameAssemblerV2::flush()
200 {
201 assert(has_socket());
202 return socket->flush();
203 }
204
205 seastar::future<>
206 FrameAssemblerV2::write_flush(ceph::bufferlist &&buf)
207 {
208 assert(has_socket());
209 if (unlikely(record_io)) {
210 txbuf.append(buf);
211 }
212 return socket->write_flush(std::move(buf));
213 }
214
215 seastar::future<FrameAssemblerV2::read_main_t>
216 FrameAssemblerV2::read_main_preamble()
217 {
218 rx_preamble.clear();
219 return read_exactly(rx_frame_asm.get_preamble_onwire_len()
220 ).then([this](auto bl) {
221 try {
222 rx_preamble.append(buffer::create(std::move(bl)));
223 const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
224 #ifdef UNIT_TESTS_BUILT
225 intercept_frame(tag, false);
226 #endif
227 return read_main_t{tag, &rx_frame_asm};
228 } catch (FrameError& e) {
229 logger().warn("{} read_main_preamble: {}", conn, e.what());
230 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
231 }
232 });
233 }
234
235 seastar::future<FrameAssemblerV2::read_payload_t*>
236 FrameAssemblerV2::read_frame_payload()
237 {
238 rx_segments_data.clear();
239 return seastar::do_until(
240 [this] {
241 return rx_frame_asm.get_num_segments() == rx_segments_data.size();
242 },
243 [this] {
244 // TODO: create aligned and contiguous buffer from socket
245 const size_t seg_idx = rx_segments_data.size();
246 if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
247 alignment != segment_t::DEFAULT_ALIGNMENT) {
248 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
249 conn, alignment, rx_segments_data.size());
250 }
251 uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
252 // TODO: create aligned and contiguous buffer from socket
253 return read_exactly(onwire_len
254 ).then([this](auto tmp_bl) {
255 logger().trace("{} RECV({}) frame segment[{}]",
256 conn, tmp_bl.size(), rx_segments_data.size());
257 bufferlist segment;
258 segment.append(buffer::create(std::move(tmp_bl)));
259 rx_segments_data.emplace_back(std::move(segment));
260 });
261 }
262 ).then([this] {
263 return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
264 }).then([this](auto bl) {
265 logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
266 bool ok = false;
267 try {
268 bufferlist rx_epilogue;
269 rx_epilogue.append(buffer::create(std::move(bl)));
270 ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
271 } catch (FrameError& e) {
272 logger().error("read_frame_payload: {} {}", conn, e.what());
273 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
274 } catch (ceph::crypto::onwire::MsgAuthError&) {
275 logger().error("read_frame_payload: {} bad auth tag", conn);
276 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
277 }
278 // we do have a mechanism that allows transmitter to start sending message
279 // and abort after putting entire data field on wire. This will be used by
280 // the kernel client to avoid unnecessary buffering.
281 if (!ok) {
282 ceph_abort("TODO");
283 }
284 return &rx_segments_data;
285 });
286 }
287
288 void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
289 {
290 const auto main_preamble =
291 reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
292 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
293 conn, bl.length(), (int)main_preamble->tag,
294 (int)main_preamble->num_segments, main_preamble->crc);
295 }
296
297 FrameAssemblerV2Ref FrameAssemblerV2::create(SocketConnection &conn)
298 {
299 return std::make_unique<FrameAssemblerV2>(conn);
300 }
301
302 } // namespace crimson::net