]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
7c673cae 16#include "RDMAStack.h"
7c673cae 17
9f95a23c
TL
18class C_handle_connection_established : public EventCallback {
19 RDMAConnectedSocketImpl *csi;
20 bool active = true;
21 public:
22 C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
23 void do_request(uint64_t fd) final {
24 if (active)
25 csi->handle_connection_established();
26 }
27 void close() {
28 active = false;
29 }
30};
31
32class C_handle_connection_read : public EventCallback {
33 RDMAConnectedSocketImpl *csi;
34 bool active = true;
35 public:
36 explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {}
37 void do_request(uint64_t fd) final {
38 if (active)
39 csi->handle_connection();
40 }
41 void close() {
42 active = false;
43 }
44};
45
7c673cae
FG
46#define dout_subsys ceph_subsys_ms
47#undef dout_prefix
48#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
49
9f95a23c
TL
50RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
51 shared_ptr<RDMADispatcher>& rdma_dispatcher,
52 RDMAWorker *w)
53 : cct(cct), connected(0), error(0), ib(ib),
54 dispatcher(rdma_dispatcher), worker(w),
55 is_server(false), read_handler(new C_handle_connection_read(this)),
56 established_handler(new C_handle_connection_established(this)),
224ce89b 57 active(false), pending(false)
7c673cae 58{
11fdf7f2 59 if (!cct->_conf->ms_async_rdma_cm) {
9f95a23c
TL
60 qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
61 local_qpn = qp->get_local_qp_number();
11fdf7f2
TL
62 notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
63 dispatcher->register_qp(qp, this);
64 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
65 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
66 }
7c673cae
FG
67}
68
7c673cae
FG
69RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
70{
71 ldout(cct, 20) << __func__ << " destruct." << dendl;
31f18b77 72 cleanup();
7c673cae 73 worker->remove_pending_conn(this);
9f95a23c 74 dispatcher->schedule_qp_destroy(local_qpn);
11fdf7f2
TL
75
76 for (unsigned i=0; i < wc.size(); ++i) {
77 dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
78 }
79 for (unsigned i=0; i < buffers.size(); ++i) {
80 dispatcher->post_chunk_to_pool(buffers[i]);
81 }
82
9f95a23c 83 std::lock_guard l{lock};
7c673cae
FG
84 if (notify_fd >= 0)
85 ::close(notify_fd);
31f18b77
FG
86 if (tcp_fd >= 0)
87 ::close(tcp_fd);
7c673cae 88 error = ECONNRESET;
7c673cae
FG
89}
90
91void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
92{
9f95a23c 93 std::lock_guard l{lock};
7c673cae
FG
94 if (wc.empty())
95 wc = std::move(v);
96 else
97 wc.insert(wc.end(), v.begin(), v.end());
98 notify();
99}
100
101void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
102{
9f95a23c 103 std::lock_guard l{lock};
7c673cae
FG
104 if (wc.empty())
105 return ;
106 w.swap(wc);
107}
108
31f18b77 109int RDMAConnectedSocketImpl::activate()
7c673cae 110{
9f95a23c
TL
111 qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
112 if (qp->modify_qp_to_rtr() != 0)
7c673cae 113 return -1;
7c673cae 114
9f95a23c 115 if (qp->modify_qp_to_rts() != 0)
7c673cae 116 return -1;
7c673cae
FG
117
118 if (!is_server) {
119 connected = 1; //indicate successfully
9f95a23c 120 ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
31f18b77 121 submit(false);
7c673cae
FG
122 }
123 active = true;
9f95a23c 124 peer_qpn = qp->get_local_cm_meta().peer_qpn;
7c673cae
FG
125
126 return 0;
127}
128
31f18b77 129int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
7c673cae
FG
130 ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
131 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
132 NetHandler net(cct);
9f95a23c
TL
133
134 // we construct a socket to transport ib sync message
135 // but we shouldn't block in tcp connecting
136 if (opts.nonblock) {
137 tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
138 } else {
139 tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
140 }
7c673cae
FG
141
142 if (tcp_fd < 0) {
143 return -errno;
144 }
7c673cae
FG
145
146 int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
147 if (r < 0) {
148 ::close(tcp_fd);
149 tcp_fd = -1;
150 return -errno;
151 }
152
153 ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
154 net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
9f95a23c
TL
155 r = 0;
156 if (opts.nonblock) {
157 worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
158 } else {
159 r = handle_connection_established(false);
160 }
161 return r;
162}
7c673cae 163
9f95a23c
TL
164int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
165 ldout(cct, 20) << __func__ << " start " << dendl;
166 // delete read event
167 worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
168 if (1 == connected) {
169 ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
170 if (need_set_fault) {
171 fault();
172 }
173 return -1;
174 }
175 // send handshake msg to server
176 qp->get_local_cm_meta().peer_qpn = 0;
177 int r = qp->send_cm_meta(cct, tcp_fd);
178 if (r < 0) {
179 ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
180 if (need_set_fault) {
181 fault();
182 }
183 return r;
184 }
185 worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
186 ldout(cct, 20) << __func__ << " finish " << dendl;
7c673cae
FG
187 return 0;
188}
189
31f18b77 190void RDMAConnectedSocketImpl::handle_connection() {
9f95a23c
TL
191 ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
192 int r = qp->recv_cm_meta(cct, tcp_fd);
11fdf7f2 193 if (r <= 0) {
7c673cae
FG
194 if (r != -EAGAIN) {
195 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
196 ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
31f18b77 197 fault();
7c673cae
FG
198 }
199 return;
200 }
201
11fdf7f2
TL
202 if (1 == connected) {
203 ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl;
204 fault();
205 return;
206 }
207
9f95a23c 208 if (!is_server) {// first time: cm meta sync + ack from server
7c673cae
FG
209 if (!connected) {
210 r = activate();
11fdf7f2 211 ceph_assert(!r);
7c673cae 212 }
31f18b77 213 notify();
9f95a23c 214 r = qp->send_cm_meta(cct, tcp_fd);
7c673cae
FG
215 if (r < 0) {
216 ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
217 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
31f18b77 218 fault();
7c673cae
FG
219 }
220 } else {
9f95a23c 221 if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
7c673cae
FG
222 if (active) {
223 ldout(cct, 10) << __func__ << " server is already active." << dendl;
224 return ;
225 }
11fdf7f2
TL
226 r = activate();
227 ceph_assert(!r);
9f95a23c 228 r = qp->send_cm_meta(cct, tcp_fd);
7c673cae
FG
229 if (r < 0) {
230 ldout(cct, 1) << __func__ << " server ack failed." << dendl;
231 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
31f18b77 232 fault();
7c673cae
FG
233 return ;
234 }
9f95a23c 235 } else { // second time: cm meta ack from client
7c673cae 236 connected = 1;
11fdf7f2
TL
237 ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
238 //cleanup();
31f18b77
FG
239 submit(false);
240 notify();
7c673cae
FG
241 }
242 }
243}
244
7c673cae
FG
245ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
246{
9f95a23c
TL
247 eventfd_t event_val = 0;
248 int r = eventfd_read(notify_fd, &event_val);
249 ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
250 << " r = " << r << dendl;
251
11fdf7f2
TL
252 if (!active) {
253 ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
254 return -EAGAIN;
255 }
9f95a23c 256
11fdf7f2
TL
257 if (0 == connected) {
258 ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
259 return -EAGAIN;
260 }
7c673cae 261 ssize_t read = 0;
9f95a23c
TL
262 read = read_buffers(buf,len);
263
264 if (is_server && connected == 0) {
265 ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
266 connected = 1; //if so, we don't need the last handshake
267 cleanup();
268 submit(false);
269 }
270
271 if (!buffers.empty()) {
272 notify();
273 }
274
275 if (read == 0 && error)
276 return -error;
277 return read == 0 ? -EAGAIN : read;
278}
7c673cae 279
9f95a23c
TL
280void RDMAConnectedSocketImpl::buffer_prefetch(void)
281{
7c673cae
FG
282 std::vector<ibv_wc> cqe;
283 get_wc(cqe);
9f95a23c
TL
284 if(cqe.empty())
285 return;
7c673cae 286
9f95a23c 287 for(size_t i = 0; i < cqe.size(); ++i) {
7c673cae 288 ibv_wc* response = &cqe[i];
11fdf7f2 289 ceph_assert(response->status == IBV_WC_SUCCESS);
7c673cae 290 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
7c673cae 291 chunk->prepare_read(response->byte_len);
9f95a23c
TL
292
293 if (chunk->get_size() == 0) {
294 chunk->reset_read_chunk();
7c673cae 295 dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
31f18b77 296 if (connected) {
7c673cae
FG
297 error = ECONNRESET;
298 ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
299 }
11fdf7f2 300 dispatcher->post_chunk_to_pool(chunk);
9f95a23c 301 continue;
7c673cae 302 } else {
9f95a23c
TL
303 buffers.push_back(chunk);
304 ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
7c673cae
FG
305 }
306 }
7c673cae 307 worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
7c673cae
FG
308}
309
310ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
311{
9f95a23c
TL
312 size_t read_size = 0, tmp = 0;
313 buffer_prefetch();
314 auto pchunk = buffers.begin();
315 while (pchunk != buffers.end()) {
316 tmp = (*pchunk)->read(buf + read_size, len - read_size);
317 read_size += tmp;
318 ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
319 << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
320
321 if ((*pchunk)->get_size() == 0) {
322 (*pchunk)->reset_read_chunk();
323 dispatcher->post_chunk_to_pool(*pchunk);
11fdf7f2 324 update_post_backlog();
9f95a23c
TL
325 ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
326 pchunk++;
7c673cae 327 }
7c673cae 328
9f95a23c
TL
329 if (read_size == len) {
330 break;
7c673cae 331 }
7c673cae
FG
332 }
333
9f95a23c
TL
334 buffers.erase(buffers.begin(), pchunk);
335 ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
336 worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
337 return read_size;
7c673cae
FG
338}
339
340ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
341{
342 if (error) {
31f18b77 343 if (!active)
7c673cae
FG
344 return -EPIPE;
345 return -error;
346 }
347 size_t bytes = bl.length();
348 if (!bytes)
349 return 0;
350 {
9f95a23c 351 std::lock_guard l{lock};
7c673cae 352 pending_bl.claim_append(bl);
31f18b77 353 if (!connected) {
9f95a23c 354 ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
7c673cae
FG
355 return bytes;
356 }
357 }
9f95a23c 358 ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
7c673cae
FG
359 ssize_t r = submit(more);
360 if (r < 0 && r != -EAGAIN)
361 return r;
362 return bytes;
363}
364
9f95a23c
TL
365size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
366 size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
367 const decltype(std::cbegin(pending_bl.buffers()))& end)
368{
369 ceph_assert(start != end);
370 auto chunk_idx = tx_buffers.size();
371 if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
372 ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
373 worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
374 return 0;
375 }
376
377 Chunk *current_chunk = tx_buffers[chunk_idx];
378 size_t write_len = 0;
379 while (start != end) {
380 const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
381
382 size_t slice_write_len = 0;
383 while (slice_write_len < start->length()) {
384 size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
385
386 slice_write_len += real_len;
387 write_len += real_len;
388 req_copy_len -= real_len;
389
390 if (current_chunk->full()) {
391 if (++chunk_idx == tx_buffers.size())
392 return write_len;
393 current_chunk = tx_buffers[chunk_idx];
394 }
395 }
396
397 ++start;
398 }
399 ceph_assert(req_copy_len == 0);
400 return write_len;
401}
402
7c673cae
FG
403ssize_t RDMAConnectedSocketImpl::submit(bool more)
404{
405 if (error)
406 return -error;
9f95a23c 407 std::lock_guard l{lock};
7c673cae
FG
408 size_t bytes = pending_bl.length();
409 ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
9f95a23c 410 << pending_bl.get_num_buffers() << dendl;
7c673cae
FG
411 if (!bytes)
412 return 0;
413
7c673cae 414 std::vector<Chunk*> tx_buffers;
11fdf7f2 415 auto it = std::cbegin(pending_bl.buffers());
9f95a23c
TL
416 auto copy_start = it;
417 size_t total_copied = 0, wait_copy_len = 0;
7c673cae 418 while (it != pending_bl.buffers().end()) {
9f95a23c
TL
419 if (ib->is_tx_buffer(it->raw_c_str())) {
420 if (wait_copy_len) {
421 size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
422 total_copied += copied;
423 if (copied < wait_copy_len)
7c673cae 424 goto sending;
9f95a23c 425 wait_copy_len = 0;
7c673cae 426 }
9f95a23c
TL
427 ceph_assert(copy_start == it);
428 tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
429 total_copied += it->length();
430 ++copy_start;
7c673cae 431 } else {
9f95a23c 432 wait_copy_len += it->length();
7c673cae
FG
433 }
434 ++it;
435 }
9f95a23c
TL
436 if (wait_copy_len)
437 total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
7c673cae
FG
438
439 sending:
9f95a23c 440 if (total_copied == 0)
7c673cae 441 return -EAGAIN;
9f95a23c 442 ceph_assert(total_copied <= pending_bl.length());
7c673cae 443 bufferlist swapped;
9f95a23c 444 if (total_copied < pending_bl.length()) {
7c673cae 445 worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
9f95a23c 446 pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
7c673cae
FG
447 pending_bl.swap(swapped);
448 } else {
449 pending_bl.clear();
450 }
451
452 ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
9f95a23c 453 << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl;
7c673cae
FG
454
455 int r = post_work_request(tx_buffers);
456 if (r < 0)
457 return r;
458
9f95a23c 459 ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
7c673cae
FG
460 return pending_bl.length() ? -EAGAIN : 0;
461}
462
463int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
464{
9f95a23c 465 ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
7c673cae
FG
466 vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
467 ibv_sge isge[tx_buffers.size()];
468 uint32_t current_sge = 0;
469 ibv_send_wr iswr[tx_buffers.size()];
470 uint32_t current_swr = 0;
471 ibv_send_wr* pre_wr = NULL;
11fdf7f2 472 uint32_t num = 0;
7c673cae 473
92f5a8d4 474 // FIPS zeroization audit 20191115: these memsets are not security related.
7c673cae
FG
475 memset(iswr, 0, sizeof(iswr));
476 memset(isge, 0, sizeof(isge));
11fdf7f2 477
7c673cae
FG
478 while (current_buffer != tx_buffers.end()) {
479 isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
480 isge[current_sge].length = (*current_buffer)->get_offset();
481 isge[current_sge].lkey = (*current_buffer)->mr->lkey;
482 ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
483
484 iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
485 iswr[current_swr].next = NULL;
486 iswr[current_swr].sg_list = &isge[current_sge];
487 iswr[current_swr].num_sge = 1;
488 iswr[current_swr].opcode = IBV_WR_SEND;
489 iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
7c673cae 490
11fdf7f2 491 num++;
7c673cae
FG
492 worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
493 if (pre_wr)
494 pre_wr->next = &iswr[current_swr];
495 pre_wr = &iswr[current_swr];
496 ++current_sge;
497 ++current_swr;
498 ++current_buffer;
499 }
500
9f95a23c 501 ibv_send_wr *bad_tx_work_request = nullptr;
7c673cae
FG
502 if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
503 ldout(cct, 1) << __func__ << " failed to send data"
504 << " (most probably should be peer not ready): "
505 << cpp_strerror(errno) << dendl;
506 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
507 return -errno;
508 }
509 worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
9f95a23c 510 ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
7c673cae
FG
511 return 0;
512}
513
514void RDMAConnectedSocketImpl::fin() {
515 ibv_send_wr wr;
92f5a8d4 516 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae 517 memset(&wr, 0, sizeof(wr));
11fdf7f2 518
7c673cae
FG
519 wr.wr_id = reinterpret_cast<uint64_t>(qp);
520 wr.num_sge = 0;
521 wr.opcode = IBV_WR_SEND;
522 wr.send_flags = IBV_SEND_SIGNALED;
9f95a23c 523 ibv_send_wr* bad_tx_work_request = nullptr;
7c673cae
FG
524 if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
525 ldout(cct, 1) << __func__ << " failed to send message="
526 << " ibv_post_send failed(most probably should be peer not ready): "
527 << cpp_strerror(errno) << dendl;
528 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
529 return ;
530 }
531}
532
31f18b77 533void RDMAConnectedSocketImpl::cleanup() {
9f95a23c
TL
534 if (read_handler && tcp_fd >= 0) {
535 (static_cast<C_handle_connection_read*>(read_handler))->close();
7c673cae 536 worker->center.submit_to(worker->center.get_id(), [this]() {
9f95a23c 537 worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
7c673cae 538 }, false);
9f95a23c
TL
539 delete read_handler;
540 read_handler = nullptr;
541 }
542 if (established_handler) {
543 (static_cast<C_handle_connection_established*>(established_handler))->close();
544 delete established_handler;
545 established_handler = nullptr;
7c673cae
FG
546 }
547}
548
549void RDMAConnectedSocketImpl::notify()
550{
9f95a23c
TL
551 eventfd_t event_val = 1;
552 int r = eventfd_write(notify_fd, event_val);
553 ceph_assert(r == 0);
7c673cae
FG
554}
555
31f18b77 556void RDMAConnectedSocketImpl::shutdown()
7c673cae 557{
31f18b77
FG
558 if (!error)
559 fin();
560 error = ECONNRESET;
7c673cae
FG
561 active = false;
562}
563
31f18b77 564void RDMAConnectedSocketImpl::close()
7c673cae 565{
31f18b77
FG
566 if (!error)
567 fin();
568 error = ECONNRESET;
569 active = false;
7c673cae
FG
570}
571
572void RDMAConnectedSocketImpl::fault()
573{
31f18b77 574 ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
7c673cae 575 error = ECONNRESET;
31f18b77 576 connected = 1;
7c673cae
FG
577 notify();
578}
31f18b77
FG
579
580void RDMAConnectedSocketImpl::set_accept_fd(int sd)
581{
582 tcp_fd = sd;
583 is_server = true;
584 worker->center.submit_to(worker->center.get_id(), [this]() {
9f95a23c 585 worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
31f18b77
FG
586 }, true);
587}
11fdf7f2
TL
588
589void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
590{
9f95a23c 591 post_backlog += num - ib->post_chunks_to_rq(num, qp);
11fdf7f2
TL
592}
593
594void RDMAConnectedSocketImpl::update_post_backlog()
595{
596 if (post_backlog)
9f95a23c 597 post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
11fdf7f2 598}