]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
7c673cae 16#include "RDMAStack.h"
7c673cae 17
9f95a23c
TL
18class C_handle_connection_established : public EventCallback {
19 RDMAConnectedSocketImpl *csi;
20 bool active = true;
21 public:
22 C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
23 void do_request(uint64_t fd) final {
24 if (active)
25 csi->handle_connection_established();
26 }
27 void close() {
28 active = false;
29 }
30};
31
32class C_handle_connection_read : public EventCallback {
33 RDMAConnectedSocketImpl *csi;
34 bool active = true;
35 public:
36 explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {}
37 void do_request(uint64_t fd) final {
38 if (active)
39 csi->handle_connection();
40 }
41 void close() {
42 active = false;
43 }
44};
45
7c673cae
FG
46#define dout_subsys ceph_subsys_ms
47#undef dout_prefix
48#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
49
f67539c2
TL
50RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband> &ib,
51 std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
9f95a23c
TL
52 RDMAWorker *w)
53 : cct(cct), connected(0), error(0), ib(ib),
54 dispatcher(rdma_dispatcher), worker(w),
55 is_server(false), read_handler(new C_handle_connection_read(this)),
56 established_handler(new C_handle_connection_established(this)),
224ce89b 57 active(false), pending(false)
7c673cae 58{
11fdf7f2 59 if (!cct->_conf->ms_async_rdma_cm) {
9f95a23c 60 qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
f67539c2
TL
61 if (!qp) {
62 lderr(cct) << __func__ << " queue pair create failed" << dendl;
63 return;
64 }
9f95a23c 65 local_qpn = qp->get_local_qp_number();
11fdf7f2
TL
66 notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
67 dispatcher->register_qp(qp, this);
68 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
69 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
70 }
7c673cae
FG
71}
72
7c673cae
FG
73RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
74{
75 ldout(cct, 20) << __func__ << " destruct." << dendl;
31f18b77 76 cleanup();
7c673cae 77 worker->remove_pending_conn(this);
9f95a23c 78 dispatcher->schedule_qp_destroy(local_qpn);
11fdf7f2
TL
79
80 for (unsigned i=0; i < wc.size(); ++i) {
81 dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
82 }
83 for (unsigned i=0; i < buffers.size(); ++i) {
84 dispatcher->post_chunk_to_pool(buffers[i]);
85 }
86
9f95a23c 87 std::lock_guard l{lock};
7c673cae
FG
88 if (notify_fd >= 0)
89 ::close(notify_fd);
31f18b77
FG
90 if (tcp_fd >= 0)
91 ::close(tcp_fd);
7c673cae 92 error = ECONNRESET;
7c673cae
FG
93}
94
95void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
96{
9f95a23c 97 std::lock_guard l{lock};
7c673cae
FG
98 if (wc.empty())
99 wc = std::move(v);
100 else
101 wc.insert(wc.end(), v.begin(), v.end());
102 notify();
103}
104
105void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
106{
9f95a23c 107 std::lock_guard l{lock};
7c673cae
FG
108 if (wc.empty())
109 return ;
110 w.swap(wc);
111}
112
31f18b77 113int RDMAConnectedSocketImpl::activate()
7c673cae 114{
9f95a23c
TL
115 qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
116 if (qp->modify_qp_to_rtr() != 0)
7c673cae 117 return -1;
7c673cae 118
9f95a23c 119 if (qp->modify_qp_to_rts() != 0)
7c673cae 120 return -1;
7c673cae
FG
121
122 if (!is_server) {
123 connected = 1; //indicate successfully
9f95a23c 124 ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
31f18b77 125 submit(false);
7c673cae
FG
126 }
127 active = true;
9f95a23c 128 peer_qpn = qp->get_local_cm_meta().peer_qpn;
7c673cae
FG
129
130 return 0;
131}
132
31f18b77 133int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
7c673cae
FG
134 ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
135 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
f67539c2 136 ceph::NetHandler net(cct);
9f95a23c
TL
137
138 // we construct a socket to transport ib sync message
139 // but we shouldn't block in tcp connecting
140 if (opts.nonblock) {
141 tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
142 } else {
143 tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
144 }
7c673cae
FG
145
146 if (tcp_fd < 0) {
147 return -errno;
148 }
7c673cae
FG
149
150 int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
151 if (r < 0) {
152 ::close(tcp_fd);
153 tcp_fd = -1;
154 return -errno;
155 }
156
157 ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
158 net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
9f95a23c
TL
159 r = 0;
160 if (opts.nonblock) {
161 worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
162 } else {
163 r = handle_connection_established(false);
164 }
165 return r;
166}
7c673cae 167
9f95a23c
TL
168int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
169 ldout(cct, 20) << __func__ << " start " << dendl;
170 // delete read event
171 worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
172 if (1 == connected) {
173 ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
174 if (need_set_fault) {
175 fault();
176 }
177 return -1;
178 }
179 // send handshake msg to server
180 qp->get_local_cm_meta().peer_qpn = 0;
181 int r = qp->send_cm_meta(cct, tcp_fd);
182 if (r < 0) {
183 ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
184 if (need_set_fault) {
185 fault();
186 }
187 return r;
188 }
189 worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
190 ldout(cct, 20) << __func__ << " finish " << dendl;
7c673cae
FG
191 return 0;
192}
193
31f18b77 194void RDMAConnectedSocketImpl::handle_connection() {
9f95a23c
TL
195 ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
196 int r = qp->recv_cm_meta(cct, tcp_fd);
11fdf7f2 197 if (r <= 0) {
7c673cae
FG
198 if (r != -EAGAIN) {
199 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
200 ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
31f18b77 201 fault();
7c673cae
FG
202 }
203 return;
204 }
205
11fdf7f2
TL
206 if (1 == connected) {
207 ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl;
208 fault();
209 return;
210 }
211
9f95a23c 212 if (!is_server) {// first time: cm meta sync + ack from server
7c673cae
FG
213 if (!connected) {
214 r = activate();
11fdf7f2 215 ceph_assert(!r);
7c673cae 216 }
31f18b77 217 notify();
9f95a23c 218 r = qp->send_cm_meta(cct, tcp_fd);
7c673cae
FG
219 if (r < 0) {
220 ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
221 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
31f18b77 222 fault();
7c673cae
FG
223 }
224 } else {
9f95a23c 225 if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
7c673cae
FG
226 if (active) {
227 ldout(cct, 10) << __func__ << " server is already active." << dendl;
228 return ;
229 }
11fdf7f2
TL
230 r = activate();
231 ceph_assert(!r);
9f95a23c 232 r = qp->send_cm_meta(cct, tcp_fd);
7c673cae
FG
233 if (r < 0) {
234 ldout(cct, 1) << __func__ << " server ack failed." << dendl;
235 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
31f18b77 236 fault();
7c673cae
FG
237 return ;
238 }
9f95a23c 239 } else { // second time: cm meta ack from client
7c673cae 240 connected = 1;
11fdf7f2
TL
241 ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
242 //cleanup();
31f18b77
FG
243 submit(false);
244 notify();
7c673cae
FG
245 }
246 }
247}
248
7c673cae
FG
249ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
250{
9f95a23c
TL
251 eventfd_t event_val = 0;
252 int r = eventfd_read(notify_fd, &event_val);
253 ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
254 << " r = " << r << dendl;
255
11fdf7f2
TL
256 if (!active) {
257 ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
258 return -EAGAIN;
259 }
9f95a23c 260
11fdf7f2
TL
261 if (0 == connected) {
262 ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
263 return -EAGAIN;
264 }
7c673cae 265 ssize_t read = 0;
9f95a23c
TL
266 read = read_buffers(buf,len);
267
268 if (is_server && connected == 0) {
269 ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
270 connected = 1; //if so, we don't need the last handshake
271 cleanup();
272 submit(false);
273 }
274
275 if (!buffers.empty()) {
276 notify();
277 }
278
279 if (read == 0 && error)
280 return -error;
281 return read == 0 ? -EAGAIN : read;
282}
7c673cae 283
9f95a23c
TL
284void RDMAConnectedSocketImpl::buffer_prefetch(void)
285{
7c673cae
FG
286 std::vector<ibv_wc> cqe;
287 get_wc(cqe);
9f95a23c
TL
288 if(cqe.empty())
289 return;
7c673cae 290
9f95a23c 291 for(size_t i = 0; i < cqe.size(); ++i) {
7c673cae 292 ibv_wc* response = &cqe[i];
11fdf7f2 293 ceph_assert(response->status == IBV_WC_SUCCESS);
7c673cae 294 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
7c673cae 295 chunk->prepare_read(response->byte_len);
9f95a23c
TL
296
297 if (chunk->get_size() == 0) {
298 chunk->reset_read_chunk();
7c673cae 299 dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
31f18b77 300 if (connected) {
7c673cae
FG
301 error = ECONNRESET;
302 ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
303 }
11fdf7f2 304 dispatcher->post_chunk_to_pool(chunk);
9f95a23c 305 continue;
7c673cae 306 } else {
9f95a23c
TL
307 buffers.push_back(chunk);
308 ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
7c673cae
FG
309 }
310 }
7c673cae 311 worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
7c673cae
FG
312}
313
314ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
315{
9f95a23c
TL
316 size_t read_size = 0, tmp = 0;
317 buffer_prefetch();
318 auto pchunk = buffers.begin();
319 while (pchunk != buffers.end()) {
320 tmp = (*pchunk)->read(buf + read_size, len - read_size);
321 read_size += tmp;
322 ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
323 << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
324
325 if ((*pchunk)->get_size() == 0) {
326 (*pchunk)->reset_read_chunk();
327 dispatcher->post_chunk_to_pool(*pchunk);
11fdf7f2 328 update_post_backlog();
9f95a23c
TL
329 ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
330 pchunk++;
7c673cae 331 }
7c673cae 332
9f95a23c
TL
333 if (read_size == len) {
334 break;
7c673cae 335 }
7c673cae
FG
336 }
337
9f95a23c
TL
338 buffers.erase(buffers.begin(), pchunk);
339 ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
340 worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
341 return read_size;
7c673cae
FG
342}
343
f67539c2 344ssize_t RDMAConnectedSocketImpl::send(ceph::buffer::list &bl, bool more)
7c673cae
FG
345{
346 if (error) {
31f18b77 347 if (!active)
7c673cae
FG
348 return -EPIPE;
349 return -error;
350 }
351 size_t bytes = bl.length();
352 if (!bytes)
353 return 0;
354 {
9f95a23c 355 std::lock_guard l{lock};
7c673cae 356 pending_bl.claim_append(bl);
31f18b77 357 if (!connected) {
9f95a23c 358 ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
7c673cae
FG
359 return bytes;
360 }
361 }
9f95a23c 362 ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
7c673cae
FG
363 ssize_t r = submit(more);
364 if (r < 0 && r != -EAGAIN)
365 return r;
366 return bytes;
367}
368
9f95a23c
TL
369size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
370 size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
371 const decltype(std::cbegin(pending_bl.buffers()))& end)
372{
373 ceph_assert(start != end);
374 auto chunk_idx = tx_buffers.size();
375 if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
376 ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
377 worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
378 return 0;
379 }
380
381 Chunk *current_chunk = tx_buffers[chunk_idx];
382 size_t write_len = 0;
383 while (start != end) {
384 const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
385
386 size_t slice_write_len = 0;
387 while (slice_write_len < start->length()) {
388 size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
389
390 slice_write_len += real_len;
391 write_len += real_len;
392 req_copy_len -= real_len;
393
394 if (current_chunk->full()) {
395 if (++chunk_idx == tx_buffers.size())
396 return write_len;
397 current_chunk = tx_buffers[chunk_idx];
398 }
399 }
400
401 ++start;
402 }
403 ceph_assert(req_copy_len == 0);
404 return write_len;
405}
406
7c673cae
FG
407ssize_t RDMAConnectedSocketImpl::submit(bool more)
408{
409 if (error)
410 return -error;
9f95a23c 411 std::lock_guard l{lock};
7c673cae
FG
412 size_t bytes = pending_bl.length();
413 ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
9f95a23c 414 << pending_bl.get_num_buffers() << dendl;
7c673cae
FG
415 if (!bytes)
416 return 0;
417
7c673cae 418 std::vector<Chunk*> tx_buffers;
11fdf7f2 419 auto it = std::cbegin(pending_bl.buffers());
9f95a23c
TL
420 auto copy_start = it;
421 size_t total_copied = 0, wait_copy_len = 0;
7c673cae 422 while (it != pending_bl.buffers().end()) {
9f95a23c
TL
423 if (ib->is_tx_buffer(it->raw_c_str())) {
424 if (wait_copy_len) {
425 size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
426 total_copied += copied;
427 if (copied < wait_copy_len)
7c673cae 428 goto sending;
9f95a23c 429 wait_copy_len = 0;
7c673cae 430 }
9f95a23c
TL
431 ceph_assert(copy_start == it);
432 tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
433 total_copied += it->length();
434 ++copy_start;
7c673cae 435 } else {
9f95a23c 436 wait_copy_len += it->length();
7c673cae
FG
437 }
438 ++it;
439 }
9f95a23c
TL
440 if (wait_copy_len)
441 total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
7c673cae
FG
442
443 sending:
9f95a23c 444 if (total_copied == 0)
7c673cae 445 return -EAGAIN;
9f95a23c 446 ceph_assert(total_copied <= pending_bl.length());
f67539c2 447 ceph::buffer::list swapped;
9f95a23c 448 if (total_copied < pending_bl.length()) {
7c673cae 449 worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
9f95a23c 450 pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
7c673cae
FG
451 pending_bl.swap(swapped);
452 } else {
453 pending_bl.clear();
454 }
455
456 ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
9f95a23c 457 << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl;
7c673cae
FG
458
459 int r = post_work_request(tx_buffers);
460 if (r < 0)
461 return r;
462
9f95a23c 463 ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
7c673cae
FG
464 return pending_bl.length() ? -EAGAIN : 0;
465}
466
467int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
468{
9f95a23c 469 ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
f67539c2 470 auto current_buffer = tx_buffers.begin();
7c673cae
FG
471 ibv_sge isge[tx_buffers.size()];
472 uint32_t current_sge = 0;
473 ibv_send_wr iswr[tx_buffers.size()];
474 uint32_t current_swr = 0;
475 ibv_send_wr* pre_wr = NULL;
11fdf7f2 476 uint32_t num = 0;
7c673cae 477
92f5a8d4 478 // FIPS zeroization audit 20191115: these memsets are not security related.
7c673cae
FG
479 memset(iswr, 0, sizeof(iswr));
480 memset(isge, 0, sizeof(isge));
11fdf7f2 481
7c673cae
FG
482 while (current_buffer != tx_buffers.end()) {
483 isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
484 isge[current_sge].length = (*current_buffer)->get_offset();
485 isge[current_sge].lkey = (*current_buffer)->mr->lkey;
486 ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
487
488 iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
489 iswr[current_swr].next = NULL;
490 iswr[current_swr].sg_list = &isge[current_sge];
491 iswr[current_swr].num_sge = 1;
492 iswr[current_swr].opcode = IBV_WR_SEND;
493 iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
7c673cae 494
11fdf7f2 495 num++;
7c673cae
FG
496 worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
497 if (pre_wr)
498 pre_wr->next = &iswr[current_swr];
499 pre_wr = &iswr[current_swr];
500 ++current_sge;
501 ++current_swr;
502 ++current_buffer;
503 }
504
9f95a23c 505 ibv_send_wr *bad_tx_work_request = nullptr;
7c673cae
FG
506 if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
507 ldout(cct, 1) << __func__ << " failed to send data"
508 << " (most probably should be peer not ready): "
509 << cpp_strerror(errno) << dendl;
510 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
511 return -errno;
512 }
513 worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
9f95a23c 514 ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
7c673cae
FG
515 return 0;
516}
517
518void RDMAConnectedSocketImpl::fin() {
519 ibv_send_wr wr;
92f5a8d4 520 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae 521 memset(&wr, 0, sizeof(wr));
11fdf7f2 522
7c673cae
FG
523 wr.wr_id = reinterpret_cast<uint64_t>(qp);
524 wr.num_sge = 0;
525 wr.opcode = IBV_WR_SEND;
526 wr.send_flags = IBV_SEND_SIGNALED;
9f95a23c 527 ibv_send_wr* bad_tx_work_request = nullptr;
7c673cae
FG
528 if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
529 ldout(cct, 1) << __func__ << " failed to send message="
530 << " ibv_post_send failed(most probably should be peer not ready): "
531 << cpp_strerror(errno) << dendl;
532 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
533 return ;
534 }
535}
536
31f18b77 537void RDMAConnectedSocketImpl::cleanup() {
9f95a23c
TL
538 if (read_handler && tcp_fd >= 0) {
539 (static_cast<C_handle_connection_read*>(read_handler))->close();
7c673cae 540 worker->center.submit_to(worker->center.get_id(), [this]() {
9f95a23c 541 worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
7c673cae 542 }, false);
9f95a23c
TL
543 delete read_handler;
544 read_handler = nullptr;
545 }
546 if (established_handler) {
547 (static_cast<C_handle_connection_established*>(established_handler))->close();
548 delete established_handler;
549 established_handler = nullptr;
7c673cae
FG
550 }
551}
552
553void RDMAConnectedSocketImpl::notify()
554{
9f95a23c
TL
555 eventfd_t event_val = 1;
556 int r = eventfd_write(notify_fd, event_val);
557 ceph_assert(r == 0);
7c673cae
FG
558}
559
31f18b77 560void RDMAConnectedSocketImpl::shutdown()
7c673cae 561{
31f18b77
FG
562 if (!error)
563 fin();
564 error = ECONNRESET;
7c673cae
FG
565 active = false;
566}
567
31f18b77 568void RDMAConnectedSocketImpl::close()
7c673cae 569{
31f18b77
FG
570 if (!error)
571 fin();
572 error = ECONNRESET;
573 active = false;
7c673cae
FG
574}
575
1e59de90
TL
576void RDMAConnectedSocketImpl::set_priority(int sd, int prio, int domain) {
577 ceph::NetHandler net(cct);
578 net.set_priority(sd, prio, domain);
579}
580
7c673cae
FG
581void RDMAConnectedSocketImpl::fault()
582{
31f18b77 583 ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
7c673cae 584 error = ECONNRESET;
31f18b77 585 connected = 1;
7c673cae
FG
586 notify();
587}
31f18b77
FG
588
589void RDMAConnectedSocketImpl::set_accept_fd(int sd)
590{
591 tcp_fd = sd;
592 is_server = true;
593 worker->center.submit_to(worker->center.get_id(), [this]() {
9f95a23c 594 worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
31f18b77
FG
595 }, true);
596}
11fdf7f2
TL
597
598void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
599{
9f95a23c 600 post_backlog += num - ib->post_chunks_to_rq(num, qp);
11fdf7f2
TL
601}
602
603void RDMAConnectedSocketImpl::update_post_backlog()
604{
605 if (post_backlog)
9f95a23c 606 post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
11fdf7f2 607}