]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "RDMAStack.h" | |
7c673cae FG |
18 | |
19 | #define dout_subsys ceph_subsys_ms | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << " RDMAConnectedSocketImpl " | |
22 | ||
7c673cae | 23 | RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, |
31f18b77 FG |
24 | RDMAWorker *w) |
25 | : cct(cct), connected(0), error(0), infiniband(ib), | |
26 | dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), | |
27 | is_server(false), con_handler(new C_handle_connection(this)), | |
224ce89b | 28 | active(false), pending(false) |
7c673cae | 29 | { |
31f18b77 FG |
30 | qp = infiniband->create_queue_pair( |
31 | cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); | |
32 | my_msg.qpn = qp->get_local_qp_number(); | |
7c673cae | 33 | my_msg.psn = qp->get_initial_psn(); |
31f18b77 | 34 | my_msg.lid = infiniband->get_lid(); |
7c673cae | 35 | my_msg.peer_qpn = 0; |
31f18b77 | 36 | my_msg.gid = infiniband->get_gid(); |
7c673cae FG |
37 | notify_fd = dispatcher->register_qp(qp, this); |
38 | dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); | |
39 | dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); | |
40 | } | |
41 | ||
7c673cae FG |
42 | RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() |
43 | { | |
44 | ldout(cct, 20) << __func__ << " destruct." << dendl; | |
31f18b77 | 45 | cleanup(); |
7c673cae | 46 | worker->remove_pending_conn(this); |
31f18b77 | 47 | dispatcher->erase_qpn(my_msg.qpn); |
7c673cae FG |
48 | Mutex::Locker l(lock); |
49 | if (notify_fd >= 0) | |
50 | ::close(notify_fd); | |
31f18b77 FG |
51 | if (tcp_fd >= 0) |
52 | ::close(tcp_fd); | |
7c673cae FG |
53 | error = ECONNRESET; |
54 | int ret = 0; | |
55 | for (unsigned i=0; i < wc.size(); ++i) { | |
31f18b77 | 56 | ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id)); |
7c673cae FG |
57 | assert(ret == 0); |
58 | dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); | |
59 | } | |
60 | for (unsigned i=0; i < buffers.size(); ++i) { | |
31f18b77 | 61 | ret = infiniband->post_chunk(buffers[i]); |
7c673cae FG |
62 | assert(ret == 0); |
63 | dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); | |
64 | } | |
7c673cae FG |
65 | } |
66 | ||
67 | void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v) | |
68 | { | |
69 | Mutex::Locker l(lock); | |
70 | if (wc.empty()) | |
71 | wc = std::move(v); | |
72 | else | |
73 | wc.insert(wc.end(), v.begin(), v.end()); | |
74 | notify(); | |
75 | } | |
76 | ||
77 | void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w) | |
78 | { | |
79 | Mutex::Locker l(lock); | |
80 | if (wc.empty()) | |
81 | return ; | |
82 | w.swap(wc); | |
83 | } | |
84 | ||
31f18b77 | 85 | int RDMAConnectedSocketImpl::activate() |
7c673cae FG |
86 | { |
87 | ibv_qp_attr qpa; | |
88 | int r; | |
89 | ||
7c673cae FG |
90 | // now connect up the qps and switch to RTR |
91 | memset(&qpa, 0, sizeof(qpa)); | |
92 | qpa.qp_state = IBV_QPS_RTR; | |
93 | qpa.path_mtu = IBV_MTU_1024; | |
94 | qpa.dest_qp_num = peer_msg.qpn; | |
95 | qpa.rq_psn = peer_msg.psn; | |
96 | qpa.max_dest_rd_atomic = 1; | |
97 | qpa.min_rnr_timer = 12; | |
98 | //qpa.ah_attr.is_global = 0; | |
99 | qpa.ah_attr.is_global = 1; | |
100 | qpa.ah_attr.grh.hop_limit = 6; | |
101 | qpa.ah_attr.grh.dgid = peer_msg.gid; | |
102 | ||
31f18b77 | 103 | qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); |
7c673cae FG |
104 | |
105 | qpa.ah_attr.dlid = peer_msg.lid; | |
106 | qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; | |
31f18b77 | 107 | qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; |
7c673cae | 108 | qpa.ah_attr.src_path_bits = 0; |
31f18b77 | 109 | qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); |
7c673cae FG |
110 | |
111 | ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; | |
112 | ||
7c673cae FG |
113 | r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | |
114 | IBV_QP_AV | | |
115 | IBV_QP_PATH_MTU | | |
116 | IBV_QP_DEST_QPN | | |
117 | IBV_QP_RQ_PSN | | |
118 | IBV_QP_MIN_RNR_TIMER | | |
119 | IBV_QP_MAX_DEST_RD_ATOMIC); | |
120 | if (r) { | |
121 | lderr(cct) << __func__ << " failed to transition to RTR state: " | |
122 | << cpp_strerror(errno) << dendl; | |
123 | return -1; | |
124 | } | |
125 | ||
126 | ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; | |
127 | ||
128 | // now move to RTS | |
129 | qpa.qp_state = IBV_QPS_RTS; | |
130 | ||
131 | // How long to wait before retrying if packet lost or server dead. | |
132 | // Supposedly the timeout is 4.096us*2^timeout. However, the actual | |
133 | // timeout appears to be 4.096us*2^(timeout+1), so the setting | |
134 | // below creates a 135ms timeout. | |
135 | qpa.timeout = 14; | |
136 | ||
137 | // How many times to retry after timeouts before giving up. | |
138 | qpa.retry_cnt = 7; | |
139 | ||
140 | // How many times to retry after RNR (receiver not ready) condition | |
141 | // before giving up. Occurs when the remote side has not yet posted | |
142 | // a receive request. | |
143 | qpa.rnr_retry = 7; // 7 is infinite retry. | |
144 | qpa.sq_psn = my_msg.psn; | |
145 | qpa.max_rd_atomic = 1; | |
146 | ||
147 | r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | | |
148 | IBV_QP_TIMEOUT | | |
149 | IBV_QP_RETRY_CNT | | |
150 | IBV_QP_RNR_RETRY | | |
151 | IBV_QP_SQ_PSN | | |
152 | IBV_QP_MAX_QP_RD_ATOMIC); | |
153 | if (r) { | |
154 | lderr(cct) << __func__ << " failed to transition to RTS state: " | |
155 | << cpp_strerror(errno) << dendl; | |
156 | return -1; | |
157 | } | |
158 | ||
159 | // the queue pair should be ready to use once the client has finished | |
160 | // setting up their end. | |
161 | ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; | |
162 | ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; | |
163 | ||
164 | if (!is_server) { | |
165 | connected = 1; //indicate successfully | |
31f18b77 FG |
166 | ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; |
167 | submit(false); | |
7c673cae FG |
168 | } |
169 | active = true; | |
170 | ||
171 | return 0; | |
172 | } | |
173 | ||
31f18b77 | 174 | int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { |
7c673cae FG |
175 | ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" |
176 | << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; | |
177 | NetHandler net(cct); | |
178 | tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); | |
179 | ||
180 | if (tcp_fd < 0) { | |
181 | return -errno; | |
182 | } | |
183 | net.set_close_on_exec(tcp_fd); | |
184 | ||
185 | int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); | |
186 | if (r < 0) { | |
187 | ::close(tcp_fd); | |
188 | tcp_fd = -1; | |
189 | return -errno; | |
190 | } | |
191 | ||
192 | ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; | |
193 | net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); | |
194 | my_msg.peer_qpn = 0; | |
31f18b77 | 195 | r = infiniband->send_msg(cct, tcp_fd, my_msg); |
7c673cae FG |
196 | if (r < 0) |
197 | return r; | |
198 | ||
199 | worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); | |
200 | return 0; | |
201 | } | |
202 | ||
31f18b77 FG |
203 | void RDMAConnectedSocketImpl::handle_connection() { |
204 | ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; | |
205 | int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); | |
7c673cae FG |
206 | if (r < 0) { |
207 | if (r != -EAGAIN) { | |
208 | dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); | |
209 | ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; | |
31f18b77 | 210 | fault(); |
7c673cae FG |
211 | } |
212 | return; | |
213 | } | |
214 | ||
215 | if (!is_server) {// syn + ack from server | |
216 | my_msg.peer_qpn = peer_msg.qpn; | |
217 | ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn | |
218 | << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; | |
219 | if (!connected) { | |
220 | r = activate(); | |
221 | assert(!r); | |
222 | } | |
31f18b77 FG |
223 | notify(); |
224 | r = infiniband->send_msg(cct, tcp_fd, my_msg); | |
7c673cae FG |
225 | if (r < 0) { |
226 | ldout(cct, 1) << __func__ << " send client ack failed." << dendl; | |
227 | dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); | |
31f18b77 | 228 | fault(); |
7c673cae FG |
229 | } |
230 | } else { | |
231 | if (peer_msg.peer_qpn == 0) {// syn from client | |
232 | if (active) { | |
233 | ldout(cct, 10) << __func__ << " server is already active." << dendl; | |
234 | return ; | |
235 | } | |
31f18b77 | 236 | r = infiniband->send_msg(cct, tcp_fd, my_msg); |
7c673cae FG |
237 | if (r < 0) { |
238 | ldout(cct, 1) << __func__ << " server ack failed." << dendl; | |
239 | dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); | |
31f18b77 | 240 | fault(); |
7c673cae FG |
241 | return ; |
242 | } | |
243 | r = activate(); | |
244 | assert(!r); | |
245 | } else { // ack from client | |
246 | connected = 1; | |
247 | cleanup(); | |
31f18b77 FG |
248 | submit(false); |
249 | notify(); | |
7c673cae FG |
250 | } |
251 | } | |
252 | } | |
253 | ||
7c673cae FG |
254 | ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) |
255 | { | |
256 | uint64_t i = 0; | |
257 | int r = ::read(notify_fd, &i, sizeof(i)); | |
31f18b77 | 258 | ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; |
7c673cae FG |
259 | ssize_t read = 0; |
260 | if (!buffers.empty()) | |
261 | read = read_buffers(buf,len); | |
262 | ||
263 | std::vector<ibv_wc> cqe; | |
264 | get_wc(cqe); | |
31f18b77 FG |
265 | if (cqe.empty()) { |
266 | if (!buffers.empty()) { | |
267 | notify(); | |
268 | } | |
269 | if (read > 0) { | |
270 | return read; | |
271 | } | |
272 | if (error) { | |
273 | return -error; | |
274 | } else { | |
275 | return -EAGAIN; | |
276 | } | |
277 | } | |
7c673cae | 278 | |
31f18b77 | 279 | ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; |
7c673cae FG |
280 | for (size_t i = 0; i < cqe.size(); ++i) { |
281 | ibv_wc* response = &cqe[i]; | |
282 | assert(response->status == IBV_WC_SUCCESS); | |
283 | Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); | |
284 | ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; | |
285 | chunk->prepare_read(response->byte_len); | |
286 | worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); | |
287 | if (response->byte_len == 0) { | |
288 | dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); | |
31f18b77 | 289 | if (connected) { |
7c673cae FG |
290 | error = ECONNRESET; |
291 | ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; | |
292 | } | |
31f18b77 | 293 | assert(infiniband->post_chunk(chunk) == 0); |
7c673cae FG |
294 | dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); |
295 | } else { | |
296 | if (read == (ssize_t)len) { | |
297 | buffers.push_back(chunk); | |
298 | ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; | |
299 | } else if (read + response->byte_len > (ssize_t)len) { | |
300 | read += chunk->read(buf+read, (ssize_t)len-read); | |
301 | buffers.push_back(chunk); | |
302 | ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; | |
303 | } else { | |
304 | read += chunk->read(buf+read, response->byte_len); | |
31f18b77 | 305 | assert(infiniband->post_chunk(chunk) == 0); |
7c673cae FG |
306 | dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); |
307 | } | |
308 | } | |
309 | } | |
310 | ||
311 | worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); | |
31f18b77 FG |
312 | if (is_server && connected == 0) { |
313 | ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; | |
314 | connected = 1; //if so, we don't need the last handshake | |
315 | cleanup(); | |
316 | submit(false); | |
317 | } | |
318 | ||
319 | if (!buffers.empty()) { | |
320 | notify(); | |
321 | } | |
7c673cae FG |
322 | |
323 | if (read == 0 && error) | |
324 | return -error; | |
325 | return read == 0 ? -EAGAIN : read; | |
326 | } | |
327 | ||
328 | ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) | |
329 | { | |
330 | size_t read = 0, tmp = 0; | |
331 | auto c = buffers.begin(); | |
332 | for (; c != buffers.end() ; ++c) { | |
333 | tmp = (*c)->read(buf+read, len-read); | |
334 | read += tmp; | |
335 | ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; | |
336 | if ((*c)->over()) { | |
31f18b77 | 337 | assert(infiniband->post_chunk(*c) == 0); |
7c673cae FG |
338 | dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); |
339 | ldout(cct, 25) << __func__ << " one chunk over." << dendl; | |
340 | } | |
341 | if (read == len) { | |
342 | break; | |
343 | } | |
344 | } | |
345 | ||
346 | if (c != buffers.end() && (*c)->over()) | |
347 | ++c; | |
348 | buffers.erase(buffers.begin(), c); | |
349 | ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; | |
350 | return read; | |
351 | } | |
352 | ||
353 | ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) | |
354 | { | |
355 | if (error) | |
356 | return -error; | |
357 | static const int MAX_COMPLETIONS = 16; | |
358 | ibv_wc wc[MAX_COMPLETIONS]; | |
359 | ssize_t size = 0; | |
360 | ||
361 | ibv_wc* response; | |
362 | Chunk* chunk; | |
363 | bool loaded = false; | |
364 | auto iter = buffers.begin(); | |
365 | if (iter != buffers.end()) { | |
366 | chunk = *iter; | |
367 | // FIXME need to handle release | |
368 | // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); | |
369 | buffers.erase(iter); | |
370 | loaded = true; | |
371 | size = chunk->bound; | |
372 | } | |
373 | ||
374 | std::vector<ibv_wc> cqe; | |
375 | get_wc(cqe); | |
376 | if (cqe.empty()) | |
377 | return size == 0 ? -EAGAIN : size; | |
378 | ||
379 | ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; | |
380 | ||
381 | for (size_t i = 0; i < cqe.size(); ++i) { | |
382 | response = &wc[i]; | |
383 | chunk = reinterpret_cast<Chunk*>(response->wr_id); | |
384 | chunk->prepare_read(response->byte_len); | |
385 | if (!loaded && i == 0) { | |
386 | // FIXME need to handle release | |
387 | // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); | |
388 | size = chunk->bound; | |
389 | continue; | |
390 | } | |
391 | buffers.push_back(chunk); | |
392 | iter++; | |
393 | } | |
394 | ||
395 | if (size == 0) | |
396 | return -EAGAIN; | |
397 | return size; | |
398 | } | |
399 | ||
400 | ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) | |
401 | { | |
402 | if (error) { | |
31f18b77 | 403 | if (!active) |
7c673cae FG |
404 | return -EPIPE; |
405 | return -error; | |
406 | } | |
407 | size_t bytes = bl.length(); | |
408 | if (!bytes) | |
409 | return 0; | |
410 | { | |
411 | Mutex::Locker l(lock); | |
412 | pending_bl.claim_append(bl); | |
31f18b77 FG |
413 | if (!connected) { |
414 | ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; | |
7c673cae FG |
415 | return bytes; |
416 | } | |
417 | } | |
31f18b77 | 418 | ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; |
7c673cae FG |
419 | ssize_t r = submit(more); |
420 | if (r < 0 && r != -EAGAIN) | |
421 | return r; | |
422 | return bytes; | |
423 | } | |
424 | ||
425 | ssize_t RDMAConnectedSocketImpl::submit(bool more) | |
426 | { | |
427 | if (error) | |
428 | return -error; | |
429 | Mutex::Locker l(lock); | |
430 | size_t bytes = pending_bl.length(); | |
431 | ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " | |
432 | << pending_bl.buffers().size() << dendl; | |
433 | if (!bytes) | |
434 | return 0; | |
435 | ||
436 | auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes, | |
437 | std::list<bufferptr>::const_iterator &start, | |
438 | std::list<bufferptr>::const_iterator &end) -> unsigned { | |
439 | assert(start != end); | |
440 | auto chunk_idx = tx_buffers.size(); | |
441 | int ret = worker->get_reged_mem(this, tx_buffers, bytes); | |
442 | if (ret == 0) { | |
443 | ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; | |
444 | worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); | |
445 | return 0; | |
446 | } | |
447 | ||
448 | unsigned total_copied = 0; | |
449 | Chunk *current_chunk = tx_buffers[chunk_idx]; | |
450 | while (start != end) { | |
451 | const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str()); | |
452 | unsigned copied = 0; | |
453 | while (copied < start->length()) { | |
454 | uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); | |
455 | copied += r; | |
456 | total_copied += r; | |
457 | bytes -= r; | |
458 | if (current_chunk->full()){ | |
459 | current_chunk = tx_buffers[++chunk_idx]; | |
460 | if (chunk_idx == tx_buffers.size()) | |
461 | return total_copied; | |
462 | } | |
463 | } | |
464 | ++start; | |
465 | } | |
466 | assert(bytes == 0); | |
467 | return total_copied; | |
468 | }; | |
469 | ||
470 | std::vector<Chunk*> tx_buffers; | |
471 | std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin(); | |
472 | std::list<bufferptr>::const_iterator copy_it = it; | |
473 | unsigned total = 0; | |
474 | unsigned need_reserve_bytes = 0; | |
475 | while (it != pending_bl.buffers().end()) { | |
31f18b77 | 476 | if (infiniband->is_tx_buffer(it->raw_c_str())) { |
7c673cae FG |
477 | if (need_reserve_bytes) { |
478 | unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); | |
479 | total += copied; | |
480 | if (copied < need_reserve_bytes) | |
481 | goto sending; | |
482 | need_reserve_bytes = 0; | |
483 | } | |
484 | assert(copy_it == it); | |
31f18b77 | 485 | tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); |
7c673cae FG |
486 | total += it->length(); |
487 | ++copy_it; | |
488 | } else { | |
489 | need_reserve_bytes += it->length(); | |
490 | } | |
491 | ++it; | |
492 | } | |
493 | if (need_reserve_bytes) | |
494 | total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); | |
495 | ||
496 | sending: | |
497 | if (total == 0) | |
498 | return -EAGAIN; | |
499 | assert(total <= pending_bl.length()); | |
500 | bufferlist swapped; | |
501 | if (total < pending_bl.length()) { | |
502 | worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); | |
503 | pending_bl.splice(total, pending_bl.length()-total, &swapped); | |
504 | pending_bl.swap(swapped); | |
505 | } else { | |
506 | pending_bl.clear(); | |
507 | } | |
508 | ||
509 | ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " | |
510 | << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; | |
511 | ||
512 | int r = post_work_request(tx_buffers); | |
513 | if (r < 0) | |
514 | return r; | |
515 | ||
516 | ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; | |
517 | return pending_bl.length() ? -EAGAIN : 0; | |
518 | } | |
519 | ||
520 | int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers) | |
521 | { | |
31f18b77 | 522 | ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; |
7c673cae FG |
523 | vector<Chunk*>::iterator current_buffer = tx_buffers.begin(); |
524 | ibv_sge isge[tx_buffers.size()]; | |
525 | uint32_t current_sge = 0; | |
526 | ibv_send_wr iswr[tx_buffers.size()]; | |
527 | uint32_t current_swr = 0; | |
528 | ibv_send_wr* pre_wr = NULL; | |
529 | ||
530 | memset(iswr, 0, sizeof(iswr)); | |
531 | memset(isge, 0, sizeof(isge)); | |
532 | current_buffer = tx_buffers.begin(); | |
533 | while (current_buffer != tx_buffers.end()) { | |
534 | isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer); | |
535 | isge[current_sge].length = (*current_buffer)->get_offset(); | |
536 | isge[current_sge].lkey = (*current_buffer)->mr->lkey; | |
537 | ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; | |
538 | ||
539 | iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer); | |
540 | iswr[current_swr].next = NULL; | |
541 | iswr[current_swr].sg_list = &isge[current_sge]; | |
542 | iswr[current_swr].num_sge = 1; | |
543 | iswr[current_swr].opcode = IBV_WR_SEND; | |
544 | iswr[current_swr].send_flags = IBV_SEND_SIGNALED; | |
545 | /*if (isge[current_sge].length < infiniband->max_inline_data) { | |
546 | iswr[current_swr].send_flags = IBV_SEND_INLINE; | |
547 | ldout(cct, 20) << __func__ << " send_inline." << dendl; | |
548 | }*/ | |
549 | ||
550 | worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); | |
551 | if (pre_wr) | |
552 | pre_wr->next = &iswr[current_swr]; | |
553 | pre_wr = &iswr[current_swr]; | |
554 | ++current_sge; | |
555 | ++current_swr; | |
556 | ++current_buffer; | |
557 | } | |
558 | ||
559 | ibv_send_wr *bad_tx_work_request; | |
560 | if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { | |
561 | ldout(cct, 1) << __func__ << " failed to send data" | |
562 | << " (most probably should be peer not ready): " | |
563 | << cpp_strerror(errno) << dendl; | |
564 | worker->perf_logger->inc(l_msgr_rdma_tx_failed); | |
565 | return -errno; | |
566 | } | |
567 | worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); | |
568 | ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; | |
569 | return 0; | |
570 | } | |
571 | ||
572 | void RDMAConnectedSocketImpl::fin() { | |
573 | ibv_send_wr wr; | |
574 | memset(&wr, 0, sizeof(wr)); | |
575 | wr.wr_id = reinterpret_cast<uint64_t>(qp); | |
576 | wr.num_sge = 0; | |
577 | wr.opcode = IBV_WR_SEND; | |
578 | wr.send_flags = IBV_SEND_SIGNALED; | |
579 | ibv_send_wr* bad_tx_work_request; | |
580 | if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { | |
581 | ldout(cct, 1) << __func__ << " failed to send message=" | |
582 | << " ibv_post_send failed(most probably should be peer not ready): " | |
583 | << cpp_strerror(errno) << dendl; | |
584 | worker->perf_logger->inc(l_msgr_rdma_tx_failed); | |
585 | return ; | |
586 | } | |
587 | } | |
588 | ||
31f18b77 | 589 | void RDMAConnectedSocketImpl::cleanup() { |
7c673cae FG |
590 | if (con_handler && tcp_fd >= 0) { |
591 | (static_cast<C_handle_connection*>(con_handler))->close(); | |
592 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
593 | worker->center.delete_file_event(tcp_fd, EVENT_READABLE); | |
594 | }, false); | |
595 | delete con_handler; | |
596 | con_handler = nullptr; | |
597 | } | |
598 | } | |
599 | ||
600 | void RDMAConnectedSocketImpl::notify() | |
601 | { | |
602 | uint64_t i = 1; | |
603 | int ret; | |
604 | ||
605 | ret = write(notify_fd, &i, sizeof(i)); | |
606 | assert(ret = sizeof(i)); | |
607 | } | |
608 | ||
31f18b77 | 609 | void RDMAConnectedSocketImpl::shutdown() |
7c673cae | 610 | { |
31f18b77 FG |
611 | if (!error) |
612 | fin(); | |
613 | error = ECONNRESET; | |
7c673cae FG |
614 | active = false; |
615 | } | |
616 | ||
31f18b77 | 617 | void RDMAConnectedSocketImpl::close() |
7c673cae | 618 | { |
31f18b77 FG |
619 | if (!error) |
620 | fin(); | |
621 | error = ECONNRESET; | |
622 | active = false; | |
7c673cae FG |
623 | } |
624 | ||
625 | void RDMAConnectedSocketImpl::fault() | |
626 | { | |
31f18b77 | 627 | ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; |
7c673cae FG |
628 | /*if (qp) { |
629 | qp->to_dead(); | |
630 | qp = NULL; | |
631 | }*/ | |
632 | error = ECONNRESET; | |
31f18b77 | 633 | connected = 1; |
7c673cae FG |
634 | notify(); |
635 | } | |
31f18b77 FG |
636 | |
637 | void RDMAConnectedSocketImpl::set_accept_fd(int sd) | |
638 | { | |
639 | tcp_fd = sd; | |
640 | is_server = true; | |
641 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
642 | worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); | |
643 | }, true); | |
644 | } |