]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
7c673cae 16#include "RDMAStack.h"
7c673cae
FG
17
18#define dout_subsys ceph_subsys_ms
19#undef dout_prefix
20#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
21
7c673cae 22RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
31f18b77
FG
23 RDMAWorker *w)
24 : cct(cct), connected(0), error(0), infiniband(ib),
25 dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
26 is_server(false), con_handler(new C_handle_connection(this)),
224ce89b 27 active(false), pending(false)
7c673cae 28{
11fdf7f2
TL
29 if (!cct->_conf->ms_async_rdma_cm) {
30 qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
31 my_msg.qpn = qp->get_local_qp_number();
32 my_msg.psn = qp->get_initial_psn();
33 my_msg.lid = infiniband->get_lid();
34 my_msg.peer_qpn = 0;
35 my_msg.gid = infiniband->get_gid();
36 notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
37 dispatcher->register_qp(qp, this);
38 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
39 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
40 }
7c673cae
FG
41}
42
7c673cae
FG
43RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
44{
45 ldout(cct, 20) << __func__ << " destruct." << dendl;
31f18b77 46 cleanup();
7c673cae 47 worker->remove_pending_conn(this);
31f18b77 48 dispatcher->erase_qpn(my_msg.qpn);
11fdf7f2
TL
49
50 for (unsigned i=0; i < wc.size(); ++i) {
51 dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
52 }
53 for (unsigned i=0; i < buffers.size(); ++i) {
54 dispatcher->post_chunk_to_pool(buffers[i]);
55 }
56
7c673cae
FG
57 Mutex::Locker l(lock);
58 if (notify_fd >= 0)
59 ::close(notify_fd);
31f18b77
FG
60 if (tcp_fd >= 0)
61 ::close(tcp_fd);
7c673cae 62 error = ECONNRESET;
7c673cae
FG
63}
64
65void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
66{
67 Mutex::Locker l(lock);
68 if (wc.empty())
69 wc = std::move(v);
70 else
71 wc.insert(wc.end(), v.begin(), v.end());
72 notify();
73}
74
75void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
76{
77 Mutex::Locker l(lock);
78 if (wc.empty())
79 return ;
80 w.swap(wc);
81}
82
31f18b77 83int RDMAConnectedSocketImpl::activate()
7c673cae
FG
84{
85 ibv_qp_attr qpa;
86 int r;
87
7c673cae
FG
88 // now connect up the qps and switch to RTR
89 memset(&qpa, 0, sizeof(qpa));
90 qpa.qp_state = IBV_QPS_RTR;
91 qpa.path_mtu = IBV_MTU_1024;
92 qpa.dest_qp_num = peer_msg.qpn;
93 qpa.rq_psn = peer_msg.psn;
94 qpa.max_dest_rd_atomic = 1;
95 qpa.min_rnr_timer = 12;
96 //qpa.ah_attr.is_global = 0;
97 qpa.ah_attr.is_global = 1;
98 qpa.ah_attr.grh.hop_limit = 6;
99 qpa.ah_attr.grh.dgid = peer_msg.gid;
100
31f18b77 101 qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
7c673cae
FG
102
103 qpa.ah_attr.dlid = peer_msg.lid;
104 qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
31f18b77 105 qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
7c673cae 106 qpa.ah_attr.src_path_bits = 0;
31f18b77 107 qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
7c673cae
FG
108
109 ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
110
7c673cae
FG
111 r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
112 IBV_QP_AV |
113 IBV_QP_PATH_MTU |
114 IBV_QP_DEST_QPN |
115 IBV_QP_RQ_PSN |
116 IBV_QP_MIN_RNR_TIMER |
117 IBV_QP_MAX_DEST_RD_ATOMIC);
118 if (r) {
119 lderr(cct) << __func__ << " failed to transition to RTR state: "
120 << cpp_strerror(errno) << dendl;
121 return -1;
122 }
123
124 ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
125
126 // now move to RTS
127 qpa.qp_state = IBV_QPS_RTS;
128
129 // How long to wait before retrying if packet lost or server dead.
130 // Supposedly the timeout is 4.096us*2^timeout. However, the actual
131 // timeout appears to be 4.096us*2^(timeout+1), so the setting
132 // below creates a 135ms timeout.
133 qpa.timeout = 14;
134
135 // How many times to retry after timeouts before giving up.
136 qpa.retry_cnt = 7;
137
138 // How many times to retry after RNR (receiver not ready) condition
139 // before giving up. Occurs when the remote side has not yet posted
140 // a receive request.
141 qpa.rnr_retry = 7; // 7 is infinite retry.
142 qpa.sq_psn = my_msg.psn;
143 qpa.max_rd_atomic = 1;
144
145 r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
146 IBV_QP_TIMEOUT |
147 IBV_QP_RETRY_CNT |
148 IBV_QP_RNR_RETRY |
149 IBV_QP_SQ_PSN |
150 IBV_QP_MAX_QP_RD_ATOMIC);
151 if (r) {
152 lderr(cct) << __func__ << " failed to transition to RTS state: "
153 << cpp_strerror(errno) << dendl;
154 return -1;
155 }
156
157 // the queue pair should be ready to use once the client has finished
158 // setting up their end.
159 ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
160 ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
161
162 if (!is_server) {
163 connected = 1; //indicate successfully
31f18b77
FG
164 ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
165 submit(false);
7c673cae
FG
166 }
167 active = true;
168
169 return 0;
170}
171
31f18b77 172int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
7c673cae
FG
173 ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
174 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
175 NetHandler net(cct);
176 tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
177
178 if (tcp_fd < 0) {
179 return -errno;
180 }
7c673cae
FG
181
182 int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
183 if (r < 0) {
184 ::close(tcp_fd);
185 tcp_fd = -1;
186 return -errno;
187 }
188
189 ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
190 net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
191 my_msg.peer_qpn = 0;
31f18b77 192 r = infiniband->send_msg(cct, tcp_fd, my_msg);
7c673cae
FG
193 if (r < 0)
194 return r;
195
196 worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
197 return 0;
198}
199
31f18b77
FG
200void RDMAConnectedSocketImpl::handle_connection() {
201 ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
202 int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
11fdf7f2 203 if (r <= 0) {
7c673cae
FG
204 if (r != -EAGAIN) {
205 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
206 ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
31f18b77 207 fault();
7c673cae
FG
208 }
209 return;
210 }
211
11fdf7f2
TL
212 if (1 == connected) {
213 ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl;
214 fault();
215 return;
216 }
217
7c673cae
FG
218 if (!is_server) {// syn + ack from server
219 my_msg.peer_qpn = peer_msg.qpn;
220 ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn
221 << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
222 if (!connected) {
223 r = activate();
11fdf7f2 224 ceph_assert(!r);
7c673cae 225 }
31f18b77
FG
226 notify();
227 r = infiniband->send_msg(cct, tcp_fd, my_msg);
7c673cae
FG
228 if (r < 0) {
229 ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
230 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
31f18b77 231 fault();
7c673cae
FG
232 }
233 } else {
234 if (peer_msg.peer_qpn == 0) {// syn from client
235 if (active) {
236 ldout(cct, 10) << __func__ << " server is already active." << dendl;
237 return ;
238 }
11fdf7f2
TL
239 r = activate();
240 ceph_assert(!r);
31f18b77 241 r = infiniband->send_msg(cct, tcp_fd, my_msg);
7c673cae
FG
242 if (r < 0) {
243 ldout(cct, 1) << __func__ << " server ack failed." << dendl;
244 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
31f18b77 245 fault();
7c673cae
FG
246 return ;
247 }
7c673cae
FG
248 } else { // ack from client
249 connected = 1;
11fdf7f2
TL
250 ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
251 //cleanup();
31f18b77
FG
252 submit(false);
253 notify();
7c673cae
FG
254 }
255 }
256}
257
7c673cae
FG
258ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
259{
260 uint64_t i = 0;
261 int r = ::read(notify_fd, &i, sizeof(i));
31f18b77 262 ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
11fdf7f2
TL
263
264 if (!active) {
265 ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
266 return -EAGAIN;
267 }
268
269 if (0 == connected) {
270 ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
271 return -EAGAIN;
272 }
7c673cae
FG
273 ssize_t read = 0;
274 if (!buffers.empty())
275 read = read_buffers(buf,len);
276
277 std::vector<ibv_wc> cqe;
278 get_wc(cqe);
31f18b77
FG
279 if (cqe.empty()) {
280 if (!buffers.empty()) {
281 notify();
282 }
283 if (read > 0) {
284 return read;
285 }
286 if (error) {
287 return -error;
288 } else {
289 return -EAGAIN;
290 }
291 }
7c673cae 292
31f18b77 293 ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
7c673cae
FG
294 for (size_t i = 0; i < cqe.size(); ++i) {
295 ibv_wc* response = &cqe[i];
11fdf7f2 296 ceph_assert(response->status == IBV_WC_SUCCESS);
7c673cae
FG
297 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
298 ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
299 chunk->prepare_read(response->byte_len);
300 worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
301 if (response->byte_len == 0) {
302 dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
31f18b77 303 if (connected) {
7c673cae
FG
304 error = ECONNRESET;
305 ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
306 }
11fdf7f2 307 dispatcher->post_chunk_to_pool(chunk);
7c673cae
FG
308 } else {
309 if (read == (ssize_t)len) {
310 buffers.push_back(chunk);
311 ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
312 } else if (read + response->byte_len > (ssize_t)len) {
313 read += chunk->read(buf+read, (ssize_t)len-read);
314 buffers.push_back(chunk);
315 ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
316 } else {
317 read += chunk->read(buf+read, response->byte_len);
11fdf7f2
TL
318 dispatcher->post_chunk_to_pool(chunk);
319 update_post_backlog();
7c673cae
FG
320 }
321 }
322 }
323
324 worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
31f18b77
FG
325 if (is_server && connected == 0) {
326 ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
327 connected = 1; //if so, we don't need the last handshake
328 cleanup();
329 submit(false);
330 }
331
332 if (!buffers.empty()) {
333 notify();
334 }
7c673cae
FG
335
336 if (read == 0 && error)
337 return -error;
338 return read == 0 ? -EAGAIN : read;
339}
340
341ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
342{
343 size_t read = 0, tmp = 0;
344 auto c = buffers.begin();
345 for (; c != buffers.end() ; ++c) {
346 tmp = (*c)->read(buf+read, len-read);
347 read += tmp;
348 ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
349 if ((*c)->over()) {
11fdf7f2
TL
350 dispatcher->post_chunk_to_pool(*c);
351 update_post_backlog();
7c673cae
FG
352 ldout(cct, 25) << __func__ << " one chunk over." << dendl;
353 }
354 if (read == len) {
355 break;
356 }
357 }
358
359 if (c != buffers.end() && (*c)->over())
360 ++c;
361 buffers.erase(buffers.begin(), c);
362 ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;
363 return read;
364}
365
366ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
367{
368 if (error)
369 return -error;
370 static const int MAX_COMPLETIONS = 16;
371 ibv_wc wc[MAX_COMPLETIONS];
372 ssize_t size = 0;
373
374 ibv_wc* response;
375 Chunk* chunk;
376 bool loaded = false;
377 auto iter = buffers.begin();
378 if (iter != buffers.end()) {
379 chunk = *iter;
380 // FIXME need to handle release
381 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
382 buffers.erase(iter);
383 loaded = true;
384 size = chunk->bound;
385 }
386
387 std::vector<ibv_wc> cqe;
388 get_wc(cqe);
389 if (cqe.empty())
390 return size == 0 ? -EAGAIN : size;
391
392 ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
393
394 for (size_t i = 0; i < cqe.size(); ++i) {
395 response = &wc[i];
396 chunk = reinterpret_cast<Chunk*>(response->wr_id);
397 chunk->prepare_read(response->byte_len);
398 if (!loaded && i == 0) {
399 // FIXME need to handle release
400 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
401 size = chunk->bound;
402 continue;
403 }
404 buffers.push_back(chunk);
405 iter++;
406 }
407
408 if (size == 0)
409 return -EAGAIN;
410 return size;
411}
412
413ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
414{
415 if (error) {
31f18b77 416 if (!active)
7c673cae
FG
417 return -EPIPE;
418 return -error;
419 }
420 size_t bytes = bl.length();
421 if (!bytes)
422 return 0;
423 {
424 Mutex::Locker l(lock);
425 pending_bl.claim_append(bl);
31f18b77
FG
426 if (!connected) {
427 ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
7c673cae
FG
428 return bytes;
429 }
430 }
31f18b77 431 ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
7c673cae
FG
432 ssize_t r = submit(more);
433 if (r < 0 && r != -EAGAIN)
434 return r;
435 return bytes;
436}
437
438ssize_t RDMAConnectedSocketImpl::submit(bool more)
439{
440 if (error)
441 return -error;
442 Mutex::Locker l(lock);
443 size_t bytes = pending_bl.length();
444 ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
445 << pending_bl.buffers().size() << dendl;
446 if (!bytes)
447 return 0;
448
11fdf7f2
TL
449 auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers,
450 unsigned bytes,
451 auto& start,
452 const auto& end) -> unsigned {
453 ceph_assert(start != end);
7c673cae
FG
454 auto chunk_idx = tx_buffers.size();
455 int ret = worker->get_reged_mem(this, tx_buffers, bytes);
456 if (ret == 0) {
457 ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
458 worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
459 return 0;
460 }
461
462 unsigned total_copied = 0;
463 Chunk *current_chunk = tx_buffers[chunk_idx];
464 while (start != end) {
11fdf7f2 465 const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
7c673cae
FG
466 unsigned copied = 0;
467 while (copied < start->length()) {
468 uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
469 copied += r;
470 total_copied += r;
471 bytes -= r;
472 if (current_chunk->full()){
11fdf7f2 473 if (++chunk_idx == tx_buffers.size())
7c673cae 474 return total_copied;
11fdf7f2 475 current_chunk = tx_buffers[chunk_idx];
7c673cae
FG
476 }
477 }
478 ++start;
479 }
11fdf7f2 480 ceph_assert(bytes == 0);
7c673cae
FG
481 return total_copied;
482 };
483
484 std::vector<Chunk*> tx_buffers;
11fdf7f2
TL
485 auto it = std::cbegin(pending_bl.buffers());
486 auto copy_it = it;
7c673cae
FG
487 unsigned total = 0;
488 unsigned need_reserve_bytes = 0;
489 while (it != pending_bl.buffers().end()) {
31f18b77 490 if (infiniband->is_tx_buffer(it->raw_c_str())) {
7c673cae
FG
491 if (need_reserve_bytes) {
492 unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
493 total += copied;
494 if (copied < need_reserve_bytes)
495 goto sending;
496 need_reserve_bytes = 0;
497 }
11fdf7f2 498 ceph_assert(copy_it == it);
31f18b77 499 tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
7c673cae
FG
500 total += it->length();
501 ++copy_it;
502 } else {
503 need_reserve_bytes += it->length();
504 }
505 ++it;
506 }
507 if (need_reserve_bytes)
508 total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
509
510 sending:
511 if (total == 0)
512 return -EAGAIN;
11fdf7f2 513 ceph_assert(total <= pending_bl.length());
7c673cae
FG
514 bufferlist swapped;
515 if (total < pending_bl.length()) {
516 worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
517 pending_bl.splice(total, pending_bl.length()-total, &swapped);
518 pending_bl.swap(swapped);
519 } else {
520 pending_bl.clear();
521 }
522
523 ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
524 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
525
526 int r = post_work_request(tx_buffers);
527 if (r < 0)
528 return r;
529
530 ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
531 return pending_bl.length() ? -EAGAIN : 0;
532}
533
534int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
535{
31f18b77 536 ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
7c673cae
FG
537 vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
538 ibv_sge isge[tx_buffers.size()];
539 uint32_t current_sge = 0;
540 ibv_send_wr iswr[tx_buffers.size()];
541 uint32_t current_swr = 0;
542 ibv_send_wr* pre_wr = NULL;
11fdf7f2 543 uint32_t num = 0;
7c673cae 544
92f5a8d4 545 // FIPS zeroization audit 20191115: these memsets are not security related.
7c673cae
FG
546 memset(iswr, 0, sizeof(iswr));
547 memset(isge, 0, sizeof(isge));
11fdf7f2 548
7c673cae
FG
549 while (current_buffer != tx_buffers.end()) {
550 isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
551 isge[current_sge].length = (*current_buffer)->get_offset();
552 isge[current_sge].lkey = (*current_buffer)->mr->lkey;
553 ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
554
555 iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
556 iswr[current_swr].next = NULL;
557 iswr[current_swr].sg_list = &isge[current_sge];
558 iswr[current_swr].num_sge = 1;
559 iswr[current_swr].opcode = IBV_WR_SEND;
560 iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
561 /*if (isge[current_sge].length < infiniband->max_inline_data) {
562 iswr[current_swr].send_flags = IBV_SEND_INLINE;
563 ldout(cct, 20) << __func__ << " send_inline." << dendl;
564 }*/
565
11fdf7f2 566 num++;
7c673cae
FG
567 worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
568 if (pre_wr)
569 pre_wr->next = &iswr[current_swr];
570 pre_wr = &iswr[current_swr];
571 ++current_sge;
572 ++current_swr;
573 ++current_buffer;
574 }
575
576 ibv_send_wr *bad_tx_work_request;
577 if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
578 ldout(cct, 1) << __func__ << " failed to send data"
579 << " (most probably should be peer not ready): "
580 << cpp_strerror(errno) << dendl;
581 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
582 return -errno;
583 }
11fdf7f2 584 qp->add_tx_wr(num);
7c673cae
FG
585 worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
586 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
587 return 0;
588}
589
590void RDMAConnectedSocketImpl::fin() {
591 ibv_send_wr wr;
92f5a8d4 592 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae 593 memset(&wr, 0, sizeof(wr));
11fdf7f2 594
7c673cae
FG
595 wr.wr_id = reinterpret_cast<uint64_t>(qp);
596 wr.num_sge = 0;
597 wr.opcode = IBV_WR_SEND;
598 wr.send_flags = IBV_SEND_SIGNALED;
599 ibv_send_wr* bad_tx_work_request;
600 if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
601 ldout(cct, 1) << __func__ << " failed to send message="
602 << " ibv_post_send failed(most probably should be peer not ready): "
603 << cpp_strerror(errno) << dendl;
604 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
605 return ;
606 }
11fdf7f2 607 qp->add_tx_wr(1);
7c673cae
FG
608}
609
31f18b77 610void RDMAConnectedSocketImpl::cleanup() {
7c673cae
FG
611 if (con_handler && tcp_fd >= 0) {
612 (static_cast<C_handle_connection*>(con_handler))->close();
613 worker->center.submit_to(worker->center.get_id(), [this]() {
614 worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
615 }, false);
616 delete con_handler;
617 con_handler = nullptr;
618 }
619}
620
621void RDMAConnectedSocketImpl::notify()
622{
11fdf7f2
TL
623 // note: notify_fd is an event fd (man eventfd)
624 // write argument must be a 64bit integer
7c673cae 625 uint64_t i = 1;
7c673cae 626
11fdf7f2 627 ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
7c673cae
FG
628}
629
31f18b77 630void RDMAConnectedSocketImpl::shutdown()
7c673cae 631{
31f18b77
FG
632 if (!error)
633 fin();
634 error = ECONNRESET;
7c673cae
FG
635 active = false;
636}
637
31f18b77 638void RDMAConnectedSocketImpl::close()
7c673cae 639{
31f18b77
FG
640 if (!error)
641 fin();
642 error = ECONNRESET;
643 active = false;
7c673cae
FG
644}
645
646void RDMAConnectedSocketImpl::fault()
647{
31f18b77 648 ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
7c673cae
FG
649 /*if (qp) {
650 qp->to_dead();
651 qp = NULL;
652 }*/
653 error = ECONNRESET;
31f18b77 654 connected = 1;
7c673cae
FG
655 notify();
656}
31f18b77
FG
657
658void RDMAConnectedSocketImpl::set_accept_fd(int sd)
659{
660 tcp_fd = sd;
661 is_server = true;
662 worker->center.submit_to(worker->center.get_id(), [this]() {
663 worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
664 }, true);
665}
11fdf7f2
TL
666
667void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
668{
669 post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
670}
671
672void RDMAConnectedSocketImpl::update_post_backlog()
673{
674 if (post_backlog)
675 post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
676}