]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
7c673cae | 16 | #include "RDMAStack.h" |
7c673cae FG |
17 | |
18 | #define dout_subsys ceph_subsys_ms | |
19 | #undef dout_prefix | |
20 | #define dout_prefix *_dout << " RDMAConnectedSocketImpl " | |
21 | ||
7c673cae | 22 | RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, |
31f18b77 FG |
23 | RDMAWorker *w) |
24 | : cct(cct), connected(0), error(0), infiniband(ib), | |
25 | dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), | |
26 | is_server(false), con_handler(new C_handle_connection(this)), | |
224ce89b | 27 | active(false), pending(false) |
7c673cae | 28 | { |
11fdf7f2 TL |
29 | if (!cct->_conf->ms_async_rdma_cm) { |
30 | qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL); | |
31 | my_msg.qpn = qp->get_local_qp_number(); | |
32 | my_msg.psn = qp->get_initial_psn(); | |
33 | my_msg.lid = infiniband->get_lid(); | |
34 | my_msg.peer_qpn = 0; | |
35 | my_msg.gid = infiniband->get_gid(); | |
36 | notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); | |
37 | dispatcher->register_qp(qp, this); | |
38 | dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); | |
39 | dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); | |
40 | } | |
7c673cae FG |
41 | } |
42 | ||
7c673cae FG |
43 | RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() |
44 | { | |
45 | ldout(cct, 20) << __func__ << " destruct." << dendl; | |
31f18b77 | 46 | cleanup(); |
7c673cae | 47 | worker->remove_pending_conn(this); |
31f18b77 | 48 | dispatcher->erase_qpn(my_msg.qpn); |
11fdf7f2 TL |
49 | |
50 | for (unsigned i=0; i < wc.size(); ++i) { | |
51 | dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id)); | |
52 | } | |
53 | for (unsigned i=0; i < buffers.size(); ++i) { | |
54 | dispatcher->post_chunk_to_pool(buffers[i]); | |
55 | } | |
56 | ||
7c673cae FG |
57 | Mutex::Locker l(lock); |
58 | if (notify_fd >= 0) | |
59 | ::close(notify_fd); | |
31f18b77 FG |
60 | if (tcp_fd >= 0) |
61 | ::close(tcp_fd); | |
7c673cae | 62 | error = ECONNRESET; |
7c673cae FG |
63 | } |
64 | ||
65 | void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v) | |
66 | { | |
67 | Mutex::Locker l(lock); | |
68 | if (wc.empty()) | |
69 | wc = std::move(v); | |
70 | else | |
71 | wc.insert(wc.end(), v.begin(), v.end()); | |
72 | notify(); | |
73 | } | |
74 | ||
75 | void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w) | |
76 | { | |
77 | Mutex::Locker l(lock); | |
78 | if (wc.empty()) | |
79 | return ; | |
80 | w.swap(wc); | |
81 | } | |
82 | ||
31f18b77 | 83 | int RDMAConnectedSocketImpl::activate() |
7c673cae FG |
84 | { |
85 | ibv_qp_attr qpa; | |
86 | int r; | |
87 | ||
7c673cae FG |
88 | // now connect up the qps and switch to RTR |
89 | memset(&qpa, 0, sizeof(qpa)); | |
90 | qpa.qp_state = IBV_QPS_RTR; | |
91 | qpa.path_mtu = IBV_MTU_1024; | |
92 | qpa.dest_qp_num = peer_msg.qpn; | |
93 | qpa.rq_psn = peer_msg.psn; | |
94 | qpa.max_dest_rd_atomic = 1; | |
95 | qpa.min_rnr_timer = 12; | |
96 | //qpa.ah_attr.is_global = 0; | |
97 | qpa.ah_attr.is_global = 1; | |
98 | qpa.ah_attr.grh.hop_limit = 6; | |
99 | qpa.ah_attr.grh.dgid = peer_msg.gid; | |
100 | ||
31f18b77 | 101 | qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); |
7c673cae FG |
102 | |
103 | qpa.ah_attr.dlid = peer_msg.lid; | |
104 | qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; | |
31f18b77 | 105 | qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; |
7c673cae | 106 | qpa.ah_attr.src_path_bits = 0; |
31f18b77 | 107 | qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); |
7c673cae FG |
108 | |
109 | ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; | |
110 | ||
7c673cae FG |
111 | r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | |
112 | IBV_QP_AV | | |
113 | IBV_QP_PATH_MTU | | |
114 | IBV_QP_DEST_QPN | | |
115 | IBV_QP_RQ_PSN | | |
116 | IBV_QP_MIN_RNR_TIMER | | |
117 | IBV_QP_MAX_DEST_RD_ATOMIC); | |
118 | if (r) { | |
119 | lderr(cct) << __func__ << " failed to transition to RTR state: " | |
120 | << cpp_strerror(errno) << dendl; | |
121 | return -1; | |
122 | } | |
123 | ||
124 | ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; | |
125 | ||
126 | // now move to RTS | |
127 | qpa.qp_state = IBV_QPS_RTS; | |
128 | ||
129 | // How long to wait before retrying if packet lost or server dead. | |
130 | // Supposedly the timeout is 4.096us*2^timeout. However, the actual | |
131 | // timeout appears to be 4.096us*2^(timeout+1), so the setting | |
132 | // below creates a 135ms timeout. | |
133 | qpa.timeout = 14; | |
134 | ||
135 | // How many times to retry after timeouts before giving up. | |
136 | qpa.retry_cnt = 7; | |
137 | ||
138 | // How many times to retry after RNR (receiver not ready) condition | |
139 | // before giving up. Occurs when the remote side has not yet posted | |
140 | // a receive request. | |
141 | qpa.rnr_retry = 7; // 7 is infinite retry. | |
142 | qpa.sq_psn = my_msg.psn; | |
143 | qpa.max_rd_atomic = 1; | |
144 | ||
145 | r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | | |
146 | IBV_QP_TIMEOUT | | |
147 | IBV_QP_RETRY_CNT | | |
148 | IBV_QP_RNR_RETRY | | |
149 | IBV_QP_SQ_PSN | | |
150 | IBV_QP_MAX_QP_RD_ATOMIC); | |
151 | if (r) { | |
152 | lderr(cct) << __func__ << " failed to transition to RTS state: " | |
153 | << cpp_strerror(errno) << dendl; | |
154 | return -1; | |
155 | } | |
156 | ||
157 | // the queue pair should be ready to use once the client has finished | |
158 | // setting up their end. | |
159 | ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; | |
160 | ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; | |
161 | ||
162 | if (!is_server) { | |
163 | connected = 1; //indicate successfully | |
31f18b77 FG |
164 | ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; |
165 | submit(false); | |
7c673cae FG |
166 | } |
167 | active = true; | |
168 | ||
169 | return 0; | |
170 | } | |
171 | ||
31f18b77 | 172 | int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { |
7c673cae FG |
173 | ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" |
174 | << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; | |
175 | NetHandler net(cct); | |
176 | tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); | |
177 | ||
178 | if (tcp_fd < 0) { | |
179 | return -errno; | |
180 | } | |
7c673cae FG |
181 | |
182 | int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); | |
183 | if (r < 0) { | |
184 | ::close(tcp_fd); | |
185 | tcp_fd = -1; | |
186 | return -errno; | |
187 | } | |
188 | ||
189 | ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; | |
190 | net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); | |
191 | my_msg.peer_qpn = 0; | |
31f18b77 | 192 | r = infiniband->send_msg(cct, tcp_fd, my_msg); |
7c673cae FG |
193 | if (r < 0) |
194 | return r; | |
195 | ||
196 | worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); | |
197 | return 0; | |
198 | } | |
199 | ||
31f18b77 FG |
200 | void RDMAConnectedSocketImpl::handle_connection() { |
201 | ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; | |
202 | int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); | |
11fdf7f2 | 203 | if (r <= 0) { |
7c673cae FG |
204 | if (r != -EAGAIN) { |
205 | dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); | |
206 | ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; | |
31f18b77 | 207 | fault(); |
7c673cae FG |
208 | } |
209 | return; | |
210 | } | |
211 | ||
11fdf7f2 TL |
212 | if (1 == connected) { |
213 | ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl; | |
214 | fault(); | |
215 | return; | |
216 | } | |
217 | ||
7c673cae FG |
218 | if (!is_server) {// syn + ack from server |
219 | my_msg.peer_qpn = peer_msg.qpn; | |
220 | ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn | |
221 | << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; | |
222 | if (!connected) { | |
223 | r = activate(); | |
11fdf7f2 | 224 | ceph_assert(!r); |
7c673cae | 225 | } |
31f18b77 FG |
226 | notify(); |
227 | r = infiniband->send_msg(cct, tcp_fd, my_msg); | |
7c673cae FG |
228 | if (r < 0) { |
229 | ldout(cct, 1) << __func__ << " send client ack failed." << dendl; | |
230 | dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); | |
31f18b77 | 231 | fault(); |
7c673cae FG |
232 | } |
233 | } else { | |
234 | if (peer_msg.peer_qpn == 0) {// syn from client | |
235 | if (active) { | |
236 | ldout(cct, 10) << __func__ << " server is already active." << dendl; | |
237 | return ; | |
238 | } | |
11fdf7f2 TL |
239 | r = activate(); |
240 | ceph_assert(!r); | |
31f18b77 | 241 | r = infiniband->send_msg(cct, tcp_fd, my_msg); |
7c673cae FG |
242 | if (r < 0) { |
243 | ldout(cct, 1) << __func__ << " server ack failed." << dendl; | |
244 | dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); | |
31f18b77 | 245 | fault(); |
7c673cae FG |
246 | return ; |
247 | } | |
7c673cae FG |
248 | } else { // ack from client |
249 | connected = 1; | |
11fdf7f2 TL |
250 | ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl; |
251 | //cleanup(); | |
31f18b77 FG |
252 | submit(false); |
253 | notify(); | |
7c673cae FG |
254 | } |
255 | } | |
256 | } | |
257 | ||
7c673cae FG |
258 | ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) |
259 | { | |
260 | uint64_t i = 0; | |
261 | int r = ::read(notify_fd, &i, sizeof(i)); | |
31f18b77 | 262 | ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; |
11fdf7f2 TL |
263 | |
264 | if (!active) { | |
265 | ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl; | |
266 | return -EAGAIN; | |
267 | } | |
268 | ||
269 | if (0 == connected) { | |
270 | ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl; | |
271 | return -EAGAIN; | |
272 | } | |
7c673cae FG |
273 | ssize_t read = 0; |
274 | if (!buffers.empty()) | |
275 | read = read_buffers(buf,len); | |
276 | ||
277 | std::vector<ibv_wc> cqe; | |
278 | get_wc(cqe); | |
31f18b77 FG |
279 | if (cqe.empty()) { |
280 | if (!buffers.empty()) { | |
281 | notify(); | |
282 | } | |
283 | if (read > 0) { | |
284 | return read; | |
285 | } | |
286 | if (error) { | |
287 | return -error; | |
288 | } else { | |
289 | return -EAGAIN; | |
290 | } | |
291 | } | |
7c673cae | 292 | |
31f18b77 | 293 | ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; |
7c673cae FG |
294 | for (size_t i = 0; i < cqe.size(); ++i) { |
295 | ibv_wc* response = &cqe[i]; | |
11fdf7f2 | 296 | ceph_assert(response->status == IBV_WC_SUCCESS); |
7c673cae FG |
297 | Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); |
298 | ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; | |
299 | chunk->prepare_read(response->byte_len); | |
300 | worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); | |
301 | if (response->byte_len == 0) { | |
302 | dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); | |
31f18b77 | 303 | if (connected) { |
7c673cae FG |
304 | error = ECONNRESET; |
305 | ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; | |
306 | } | |
11fdf7f2 | 307 | dispatcher->post_chunk_to_pool(chunk); |
7c673cae FG |
308 | } else { |
309 | if (read == (ssize_t)len) { | |
310 | buffers.push_back(chunk); | |
311 | ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; | |
312 | } else if (read + response->byte_len > (ssize_t)len) { | |
313 | read += chunk->read(buf+read, (ssize_t)len-read); | |
314 | buffers.push_back(chunk); | |
315 | ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; | |
316 | } else { | |
317 | read += chunk->read(buf+read, response->byte_len); | |
11fdf7f2 TL |
318 | dispatcher->post_chunk_to_pool(chunk); |
319 | update_post_backlog(); | |
7c673cae FG |
320 | } |
321 | } | |
322 | } | |
323 | ||
324 | worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); | |
31f18b77 FG |
325 | if (is_server && connected == 0) { |
326 | ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; | |
327 | connected = 1; //if so, we don't need the last handshake | |
328 | cleanup(); | |
329 | submit(false); | |
330 | } | |
331 | ||
332 | if (!buffers.empty()) { | |
333 | notify(); | |
334 | } | |
7c673cae FG |
335 | |
336 | if (read == 0 && error) | |
337 | return -error; | |
338 | return read == 0 ? -EAGAIN : read; | |
339 | } | |
340 | ||
341 | ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) | |
342 | { | |
343 | size_t read = 0, tmp = 0; | |
344 | auto c = buffers.begin(); | |
345 | for (; c != buffers.end() ; ++c) { | |
346 | tmp = (*c)->read(buf+read, len-read); | |
347 | read += tmp; | |
348 | ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; | |
349 | if ((*c)->over()) { | |
11fdf7f2 TL |
350 | dispatcher->post_chunk_to_pool(*c); |
351 | update_post_backlog(); | |
7c673cae FG |
352 | ldout(cct, 25) << __func__ << " one chunk over." << dendl; |
353 | } | |
354 | if (read == len) { | |
355 | break; | |
356 | } | |
357 | } | |
358 | ||
359 | if (c != buffers.end() && (*c)->over()) | |
360 | ++c; | |
361 | buffers.erase(buffers.begin(), c); | |
362 | ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; | |
363 | return read; | |
364 | } | |
365 | ||
366 | ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) | |
367 | { | |
368 | if (error) | |
369 | return -error; | |
370 | static const int MAX_COMPLETIONS = 16; | |
371 | ibv_wc wc[MAX_COMPLETIONS]; | |
372 | ssize_t size = 0; | |
373 | ||
374 | ibv_wc* response; | |
375 | Chunk* chunk; | |
376 | bool loaded = false; | |
377 | auto iter = buffers.begin(); | |
378 | if (iter != buffers.end()) { | |
379 | chunk = *iter; | |
380 | // FIXME need to handle release | |
381 | // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); | |
382 | buffers.erase(iter); | |
383 | loaded = true; | |
384 | size = chunk->bound; | |
385 | } | |
386 | ||
387 | std::vector<ibv_wc> cqe; | |
388 | get_wc(cqe); | |
389 | if (cqe.empty()) | |
390 | return size == 0 ? -EAGAIN : size; | |
391 | ||
392 | ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; | |
393 | ||
394 | for (size_t i = 0; i < cqe.size(); ++i) { | |
395 | response = &wc[i]; | |
396 | chunk = reinterpret_cast<Chunk*>(response->wr_id); | |
397 | chunk->prepare_read(response->byte_len); | |
398 | if (!loaded && i == 0) { | |
399 | // FIXME need to handle release | |
400 | // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); | |
401 | size = chunk->bound; | |
402 | continue; | |
403 | } | |
404 | buffers.push_back(chunk); | |
405 | iter++; | |
406 | } | |
407 | ||
408 | if (size == 0) | |
409 | return -EAGAIN; | |
410 | return size; | |
411 | } | |
412 | ||
413 | ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) | |
414 | { | |
415 | if (error) { | |
31f18b77 | 416 | if (!active) |
7c673cae FG |
417 | return -EPIPE; |
418 | return -error; | |
419 | } | |
420 | size_t bytes = bl.length(); | |
421 | if (!bytes) | |
422 | return 0; | |
423 | { | |
424 | Mutex::Locker l(lock); | |
425 | pending_bl.claim_append(bl); | |
31f18b77 FG |
426 | if (!connected) { |
427 | ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; | |
7c673cae FG |
428 | return bytes; |
429 | } | |
430 | } | |
31f18b77 | 431 | ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; |
7c673cae FG |
432 | ssize_t r = submit(more); |
433 | if (r < 0 && r != -EAGAIN) | |
434 | return r; | |
435 | return bytes; | |
436 | } | |
437 | ||
438 | ssize_t RDMAConnectedSocketImpl::submit(bool more) | |
439 | { | |
440 | if (error) | |
441 | return -error; | |
442 | Mutex::Locker l(lock); | |
443 | size_t bytes = pending_bl.length(); | |
444 | ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " | |
445 | << pending_bl.buffers().size() << dendl; | |
446 | if (!bytes) | |
447 | return 0; | |
448 | ||
11fdf7f2 TL |
449 | auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, |
450 | unsigned bytes, | |
451 | auto& start, | |
452 | const auto& end) -> unsigned { | |
453 | ceph_assert(start != end); | |
7c673cae FG |
454 | auto chunk_idx = tx_buffers.size(); |
455 | int ret = worker->get_reged_mem(this, tx_buffers, bytes); | |
456 | if (ret == 0) { | |
457 | ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; | |
458 | worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); | |
459 | return 0; | |
460 | } | |
461 | ||
462 | unsigned total_copied = 0; | |
463 | Chunk *current_chunk = tx_buffers[chunk_idx]; | |
464 | while (start != end) { | |
11fdf7f2 | 465 | const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str()); |
7c673cae FG |
466 | unsigned copied = 0; |
467 | while (copied < start->length()) { | |
468 | uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); | |
469 | copied += r; | |
470 | total_copied += r; | |
471 | bytes -= r; | |
472 | if (current_chunk->full()){ | |
11fdf7f2 | 473 | if (++chunk_idx == tx_buffers.size()) |
7c673cae | 474 | return total_copied; |
11fdf7f2 | 475 | current_chunk = tx_buffers[chunk_idx]; |
7c673cae FG |
476 | } |
477 | } | |
478 | ++start; | |
479 | } | |
11fdf7f2 | 480 | ceph_assert(bytes == 0); |
7c673cae FG |
481 | return total_copied; |
482 | }; | |
483 | ||
484 | std::vector<Chunk*> tx_buffers; | |
11fdf7f2 TL |
485 | auto it = std::cbegin(pending_bl.buffers()); |
486 | auto copy_it = it; | |
7c673cae FG |
487 | unsigned total = 0; |
488 | unsigned need_reserve_bytes = 0; | |
489 | while (it != pending_bl.buffers().end()) { | |
31f18b77 | 490 | if (infiniband->is_tx_buffer(it->raw_c_str())) { |
7c673cae FG |
491 | if (need_reserve_bytes) { |
492 | unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); | |
493 | total += copied; | |
494 | if (copied < need_reserve_bytes) | |
495 | goto sending; | |
496 | need_reserve_bytes = 0; | |
497 | } | |
11fdf7f2 | 498 | ceph_assert(copy_it == it); |
31f18b77 | 499 | tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); |
7c673cae FG |
500 | total += it->length(); |
501 | ++copy_it; | |
502 | } else { | |
503 | need_reserve_bytes += it->length(); | |
504 | } | |
505 | ++it; | |
506 | } | |
507 | if (need_reserve_bytes) | |
508 | total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); | |
509 | ||
510 | sending: | |
511 | if (total == 0) | |
512 | return -EAGAIN; | |
11fdf7f2 | 513 | ceph_assert(total <= pending_bl.length()); |
7c673cae FG |
514 | bufferlist swapped; |
515 | if (total < pending_bl.length()) { | |
516 | worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); | |
517 | pending_bl.splice(total, pending_bl.length()-total, &swapped); | |
518 | pending_bl.swap(swapped); | |
519 | } else { | |
520 | pending_bl.clear(); | |
521 | } | |
522 | ||
523 | ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " | |
524 | << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; | |
525 | ||
526 | int r = post_work_request(tx_buffers); | |
527 | if (r < 0) | |
528 | return r; | |
529 | ||
530 | ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; | |
531 | return pending_bl.length() ? -EAGAIN : 0; | |
532 | } | |
533 | ||
534 | int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers) | |
535 | { | |
31f18b77 | 536 | ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; |
7c673cae FG |
537 | vector<Chunk*>::iterator current_buffer = tx_buffers.begin(); |
538 | ibv_sge isge[tx_buffers.size()]; | |
539 | uint32_t current_sge = 0; | |
540 | ibv_send_wr iswr[tx_buffers.size()]; | |
541 | uint32_t current_swr = 0; | |
542 | ibv_send_wr* pre_wr = NULL; | |
11fdf7f2 | 543 | uint32_t num = 0; |
7c673cae | 544 | |
92f5a8d4 | 545 | // FIPS zeroization audit 20191115: these memsets are not security related. |
7c673cae FG |
546 | memset(iswr, 0, sizeof(iswr)); |
547 | memset(isge, 0, sizeof(isge)); | |
11fdf7f2 | 548 | |
7c673cae FG |
549 | while (current_buffer != tx_buffers.end()) { |
550 | isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer); | |
551 | isge[current_sge].length = (*current_buffer)->get_offset(); | |
552 | isge[current_sge].lkey = (*current_buffer)->mr->lkey; | |
553 | ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; | |
554 | ||
555 | iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer); | |
556 | iswr[current_swr].next = NULL; | |
557 | iswr[current_swr].sg_list = &isge[current_sge]; | |
558 | iswr[current_swr].num_sge = 1; | |
559 | iswr[current_swr].opcode = IBV_WR_SEND; | |
560 | iswr[current_swr].send_flags = IBV_SEND_SIGNALED; | |
561 | /*if (isge[current_sge].length < infiniband->max_inline_data) { | |
562 | iswr[current_swr].send_flags = IBV_SEND_INLINE; | |
563 | ldout(cct, 20) << __func__ << " send_inline." << dendl; | |
564 | }*/ | |
565 | ||
11fdf7f2 | 566 | num++; |
7c673cae FG |
567 | worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); |
568 | if (pre_wr) | |
569 | pre_wr->next = &iswr[current_swr]; | |
570 | pre_wr = &iswr[current_swr]; | |
571 | ++current_sge; | |
572 | ++current_swr; | |
573 | ++current_buffer; | |
574 | } | |
575 | ||
576 | ibv_send_wr *bad_tx_work_request; | |
577 | if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { | |
578 | ldout(cct, 1) << __func__ << " failed to send data" | |
579 | << " (most probably should be peer not ready): " | |
580 | << cpp_strerror(errno) << dendl; | |
581 | worker->perf_logger->inc(l_msgr_rdma_tx_failed); | |
582 | return -errno; | |
583 | } | |
11fdf7f2 | 584 | qp->add_tx_wr(num); |
7c673cae FG |
585 | worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); |
586 | ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; | |
587 | return 0; | |
588 | } | |
589 | ||
590 | void RDMAConnectedSocketImpl::fin() { | |
591 | ibv_send_wr wr; | |
92f5a8d4 | 592 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae | 593 | memset(&wr, 0, sizeof(wr)); |
11fdf7f2 | 594 | |
7c673cae FG |
595 | wr.wr_id = reinterpret_cast<uint64_t>(qp); |
596 | wr.num_sge = 0; | |
597 | wr.opcode = IBV_WR_SEND; | |
598 | wr.send_flags = IBV_SEND_SIGNALED; | |
599 | ibv_send_wr* bad_tx_work_request; | |
600 | if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { | |
601 | ldout(cct, 1) << __func__ << " failed to send message=" | |
602 | << " ibv_post_send failed(most probably should be peer not ready): " | |
603 | << cpp_strerror(errno) << dendl; | |
604 | worker->perf_logger->inc(l_msgr_rdma_tx_failed); | |
605 | return ; | |
606 | } | |
11fdf7f2 | 607 | qp->add_tx_wr(1); |
7c673cae FG |
608 | } |
609 | ||
31f18b77 | 610 | void RDMAConnectedSocketImpl::cleanup() { |
7c673cae FG |
611 | if (con_handler && tcp_fd >= 0) { |
612 | (static_cast<C_handle_connection*>(con_handler))->close(); | |
613 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
614 | worker->center.delete_file_event(tcp_fd, EVENT_READABLE); | |
615 | }, false); | |
616 | delete con_handler; | |
617 | con_handler = nullptr; | |
618 | } | |
619 | } | |
620 | ||
621 | void RDMAConnectedSocketImpl::notify() | |
622 | { | |
11fdf7f2 TL |
623 | // note: notify_fd is an event fd (man eventfd) |
624 | // write argument must be a 64bit integer | |
7c673cae | 625 | uint64_t i = 1; |
7c673cae | 626 | |
11fdf7f2 | 627 | ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i))); |
7c673cae FG |
628 | } |
629 | ||
31f18b77 | 630 | void RDMAConnectedSocketImpl::shutdown() |
7c673cae | 631 | { |
31f18b77 FG |
632 | if (!error) |
633 | fin(); | |
634 | error = ECONNRESET; | |
7c673cae FG |
635 | active = false; |
636 | } | |
637 | ||
31f18b77 | 638 | void RDMAConnectedSocketImpl::close() |
7c673cae | 639 | { |
31f18b77 FG |
640 | if (!error) |
641 | fin(); | |
642 | error = ECONNRESET; | |
643 | active = false; | |
7c673cae FG |
644 | } |
645 | ||
646 | void RDMAConnectedSocketImpl::fault() | |
647 | { | |
31f18b77 | 648 | ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; |
7c673cae FG |
649 | /*if (qp) { |
650 | qp->to_dead(); | |
651 | qp = NULL; | |
652 | }*/ | |
653 | error = ECONNRESET; | |
31f18b77 | 654 | connected = 1; |
7c673cae FG |
655 | notify(); |
656 | } | |
31f18b77 FG |
657 | |
658 | void RDMAConnectedSocketImpl::set_accept_fd(int sd) | |
659 | { | |
660 | tcp_fd = sd; | |
661 | is_server = true; | |
662 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
663 | worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); | |
664 | }, true); | |
665 | } | |
11fdf7f2 TL |
666 | |
667 | void RDMAConnectedSocketImpl::post_chunks_to_rq(int num) | |
668 | { | |
669 | post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp()); | |
670 | } | |
671 | ||
672 | void RDMAConnectedSocketImpl::update_post_backlog() | |
673 | { | |
674 | if (post_backlog) | |
675 | post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp()); | |
676 | } |