1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "Infiniband.h"
18 #include "common/errno.h"
19 #include "common/debug.h"
20 #include "RDMAStack.h"
22 #define dout_subsys ceph_subsys_ms
24 #define dout_prefix *_dout << "Infiniband "
26 static const uint32_t MAX_SHARED_RX_SGE_COUNT
= 1;
27 static const uint32_t MAX_INLINE_DATA
= 0;
28 static const uint32_t TCP_MSG_LEN
= sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
29 static const uint32_t CQ_DEPTH
= 30000;
31 Port::Port(CephContext
*cct
, struct ibv_context
* ictxt
, uint8_t ipn
): ctxt(ictxt
), port_num(ipn
), port_attr(new ibv_port_attr
)
35 struct ibv_exp_gid_attr gid_attr
;
36 bool malformed
= false;
38 ldout(cct
,1) << __func__
<< " using experimental verbs for gid" << dendl
;
39 int r
= ibv_query_port(ctxt
, port_num
, port_attr
);
41 lderr(cct
) << __func__
<< " query port failed " << cpp_strerror(errno
) << dendl
;
47 // search for requested GID in GIDs table
48 ldout(cct
, 1) << __func__
<< " looking for local GID " << (cct
->_conf
->ms_async_rdma_local_gid
)
49 << " of type " << (cct
->_conf
->ms_async_rdma_roce_ver
) << dendl
;
50 r
= sscanf(cct
->_conf
->ms_async_rdma_local_gid
.c_str(),
51 "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
52 ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
53 &cgid
.raw
[ 0], &cgid
.raw
[ 1],
54 &cgid
.raw
[ 2], &cgid
.raw
[ 3],
55 &cgid
.raw
[ 4], &cgid
.raw
[ 5],
56 &cgid
.raw
[ 6], &cgid
.raw
[ 7],
57 &cgid
.raw
[ 8], &cgid
.raw
[ 9],
58 &cgid
.raw
[10], &cgid
.raw
[11],
59 &cgid
.raw
[12], &cgid
.raw
[13],
60 &cgid
.raw
[14], &cgid
.raw
[15]);
63 ldout(cct
, 1) << __func__
<< " malformed or no GID supplied, using GID index 0" << dendl
;
67 gid_attr
.comp_mask
= IBV_EXP_QUERY_GID_ATTR_TYPE
;
69 for (gid_idx
= 0; gid_idx
< port_attr
->gid_tbl_len
; gid_idx
++) {
70 r
= ibv_query_gid(ctxt
, port_num
, gid_idx
, &gid
);
72 lderr(cct
) << __func__
<< " query gid of port " << port_num
<< " index " << gid_idx
<< " failed " << cpp_strerror(errno
) << dendl
;
75 r
= ibv_exp_query_gid_attr(ctxt
, port_num
, gid_idx
, &gid_attr
);
77 lderr(cct
) << __func__
<< " query gid attributes of port " << port_num
<< " index " << gid_idx
<< " failed " << cpp_strerror(errno
) << dendl
;
81 if (malformed
) break; // stay with gid_idx=0
82 if ( (gid_attr
.type
== cct
->_conf
->ms_async_rdma_roce_ver
) &&
83 (memcmp(&gid
, &cgid
, 16) == 0) ) {
84 ldout(cct
, 1) << __func__
<< " found at index " << gid_idx
<< dendl
;
89 if (gid_idx
== port_attr
->gid_tbl_len
) {
90 lderr(cct
) << __func__
<< " Requested local GID was not found in GID table" << dendl
;
94 int r
= ibv_query_port(ctxt
, port_num
, port_attr
);
96 lderr(cct
) << __func__
<< " query port failed " << cpp_strerror(errno
) << dendl
;
100 lid
= port_attr
->lid
;
101 r
= ibv_query_gid(ctxt
, port_num
, 0, &gid
);
103 lderr(cct
) << __func__
<< " query gid failed " << cpp_strerror(errno
) << dendl
;
110 Device::Device(CephContext
*cct
, ibv_device
* d
): device(d
), device_attr(new ibv_device_attr
), active_port(nullptr)
112 if (device
== NULL
) {
113 lderr(cct
) << __func__
<< " device == NULL" << cpp_strerror(errno
) << dendl
;
116 name
= ibv_get_device_name(device
);
117 ctxt
= ibv_open_device(device
);
119 lderr(cct
) << __func__
<< " open rdma device failed. " << cpp_strerror(errno
) << dendl
;
122 int r
= ibv_query_device(ctxt
, device_attr
);
124 lderr(cct
) << __func__
<< " failed to query rdma device. " << cpp_strerror(errno
) << dendl
;
129 void Device::binding_port(CephContext
*cct
, int port_num
) {
130 port_cnt
= device_attr
->phys_port_cnt
;
131 for (uint8_t i
= 0; i
< port_cnt
; ++i
) {
132 Port
*port
= new Port(cct
, ctxt
, i
+1);
133 if (i
+ 1 == port_num
&& port
->get_port_attr()->state
== IBV_PORT_ACTIVE
) {
135 ldout(cct
, 1) << __func__
<< " found active port " << i
+1 << dendl
;
138 ldout(cct
, 10) << __func__
<< " port " << i
+1 << " is not what we want. state: " << port
->get_port_attr()->state
<< ")"<< dendl
;
142 if (nullptr == active_port
) {
143 lderr(cct
) << __func__
<< " port not found" << dendl
;
149 Infiniband::QueuePair::QueuePair(
150 CephContext
*c
, Infiniband
& infiniband
, ibv_qp_type type
,
151 int port
, ibv_srq
*srq
,
152 Infiniband::CompletionQueue
* txcq
, Infiniband::CompletionQueue
* rxcq
,
153 uint32_t max_send_wr
, uint32_t max_recv_wr
, uint32_t q_key
)
154 : cct(c
), infiniband(infiniband
),
156 ctxt(infiniband
.device
->ctxt
),
157 ib_physical_port(port
),
158 pd(infiniband
.pd
->pd
),
164 max_send_wr(max_send_wr
),
165 max_recv_wr(max_recv_wr
),
169 initial_psn
= lrand48() & 0xffffff;
170 if (type
!= IBV_QPT_RC
&& type
!= IBV_QPT_UD
&& type
!= IBV_QPT_RAW_PACKET
) {
171 lderr(cct
) << __func__
<< " invalid queue pair type" << cpp_strerror(errno
) << dendl
;
174 pd
= infiniband
.pd
->pd
;
177 int Infiniband::QueuePair::init()
179 ldout(cct
, 20) << __func__
<< " started." << dendl
;
180 ibv_qp_init_attr qpia
;
181 memset(&qpia
, 0, sizeof(qpia
));
182 qpia
.send_cq
= txcq
->get_cq();
183 qpia
.recv_cq
= rxcq
->get_cq();
184 qpia
.srq
= srq
; // use the same shared receive queue
185 qpia
.cap
.max_send_wr
= max_send_wr
; // max outstanding send requests
186 qpia
.cap
.max_send_sge
= 1; // max send scatter-gather elements
187 qpia
.cap
.max_inline_data
= MAX_INLINE_DATA
; // max bytes of immediate data on send q
188 qpia
.qp_type
= type
; // RC, UC, UD, or XRC
189 qpia
.sq_sig_all
= 0; // only generate CQEs on requested WQEs
191 qp
= ibv_create_qp(pd
, &qpia
);
193 lderr(cct
) << __func__
<< " failed to create queue pair" << cpp_strerror(errno
) << dendl
;
194 if (errno
== ENOMEM
) {
195 lderr(cct
) << __func__
<< " try reducing ms_async_rdma_receive_buffers, "
196 " ms_async_rdma_send_buffers or"
197 " ms_async_rdma_buffer_size" << dendl
;
202 ldout(cct
, 20) << __func__
<< " successfully create queue pair: "
203 << "qp=" << qp
<< dendl
;
205 // move from RESET to INIT state
207 memset(&qpa
, 0, sizeof(qpa
));
208 qpa
.qp_state
= IBV_QPS_INIT
;
210 qpa
.port_num
= (uint8_t)(ib_physical_port
);
211 qpa
.qp_access_flags
= IBV_ACCESS_REMOTE_WRITE
| IBV_ACCESS_LOCAL_WRITE
;
214 int mask
= IBV_QP_STATE
| IBV_QP_PORT
;
217 mask
|= IBV_QP_ACCESS_FLAGS
;
218 mask
|= IBV_QP_PKEY_INDEX
;
222 mask
|= IBV_QP_PKEY_INDEX
;
224 case IBV_QPT_RAW_PACKET
:
230 int ret
= ibv_modify_qp(qp
, &qpa
, mask
);
233 lderr(cct
) << __func__
<< " failed to transition to INIT state: "
234 << cpp_strerror(errno
) << dendl
;
237 ldout(cct
, 20) << __func__
<< " successfully change queue pair to INIT:"
238 << " qp=" << qp
<< dendl
;
243 * Change RC QueuePair into the ERROR state. This is necessary modify
244 * the Queue Pair into the Error state and poll all of the relevant
245 * Work Completions prior to destroying a Queue Pair.
246 * Since destroying a Queue Pair does not guarantee that its Work
247 * Completions are removed from the CQ upon destruction. Even if the
248 * Work Completions are already in the CQ, it might not be possible to
249 * retrieve them. If the Queue Pair is associated with an SRQ, it is
250 * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
253 * -errno if the QueuePair can't switch to ERROR
256 int Infiniband::QueuePair::to_dead()
261 memset(&qpa
, 0, sizeof(qpa
));
262 qpa
.qp_state
= IBV_QPS_ERR
;
264 int mask
= IBV_QP_STATE
;
265 int ret
= ibv_modify_qp(qp
, &qpa
, mask
);
267 lderr(cct
) << __func__
<< " failed to transition to ERROR state: "
268 << cpp_strerror(errno
) << dendl
;
275 int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp
) const
278 ibv_qp_init_attr qpia
;
280 int r
= ibv_query_qp(qp
, &qpa
, IBV_QP_DEST_QPN
, &qpia
);
282 lderr(cct
) << __func__
<< " failed to query qp: "
283 << cpp_strerror(errno
) << dendl
;
288 *rqp
= qpa
.dest_qp_num
;
293 * Get the remote infiniband address for this QueuePair, as set in #plumb().
294 * LIDs are "local IDs" in infiniband terminology. They are short, locally
295 * routable addresses.
297 int Infiniband::QueuePair::get_remote_lid(uint16_t *lid
) const
300 ibv_qp_init_attr qpia
;
302 int r
= ibv_query_qp(qp
, &qpa
, IBV_QP_AV
, &qpia
);
304 lderr(cct
) << __func__
<< " failed to query qp: "
305 << cpp_strerror(errno
) << dendl
;
310 *lid
= qpa
.ah_attr
.dlid
;
315 * Get the state of a QueuePair.
317 int Infiniband::QueuePair::get_state() const
320 ibv_qp_init_attr qpia
;
322 int r
= ibv_query_qp(qp
, &qpa
, IBV_QP_STATE
, &qpia
);
324 lderr(cct
) << __func__
<< " failed to get state: "
325 << cpp_strerror(errno
) << dendl
;
332 * Return true if the queue pair is in an error state, false otherwise.
334 bool Infiniband::QueuePair::is_error() const
337 ibv_qp_init_attr qpia
;
339 int r
= ibv_query_qp(qp
, &qpa
, -1, &qpia
);
341 lderr(cct
) << __func__
<< " failed to get state: "
342 << cpp_strerror(errno
) << dendl
;
345 return qpa
.cur_qp_state
== IBV_QPS_ERR
;
349 Infiniband::CompletionChannel::CompletionChannel(CephContext
*c
, Infiniband
&ib
)
350 : cct(c
), infiniband(ib
), channel(NULL
), cq(NULL
), cq_events_that_need_ack(0)
354 Infiniband::CompletionChannel::~CompletionChannel()
357 int r
= ibv_destroy_comp_channel(channel
);
359 lderr(cct
) << __func__
<< " failed to destroy cc: " << cpp_strerror(errno
) << dendl
;
364 int Infiniband::CompletionChannel::init()
366 ldout(cct
, 20) << __func__
<< " started." << dendl
;
367 channel
= ibv_create_comp_channel(infiniband
.device
->ctxt
);
369 lderr(cct
) << __func__
<< " failed to create receive completion channel: "
370 << cpp_strerror(errno
) << dendl
;
373 int rc
= NetHandler(cct
).set_nonblock(channel
->fd
);
375 ibv_destroy_comp_channel(channel
);
381 void Infiniband::CompletionChannel::ack_events()
383 ibv_ack_cq_events(cq
, cq_events_that_need_ack
);
384 cq_events_that_need_ack
= 0;
387 bool Infiniband::CompletionChannel::get_cq_event()
391 if (ibv_get_cq_event(channel
, &cq
, &ev_ctx
)) {
392 if (errno
!= EAGAIN
&& errno
!= EINTR
)
393 lderr(cct
) << __func__
<< " failed to retrieve CQ event: "
394 << cpp_strerror(errno
) << dendl
;
398 /* accumulate number of cq events that need to
399 * * be acked, and periodically ack them
401 if (++cq_events_that_need_ack
== MAX_ACK_EVENT
) {
402 ldout(cct
, 20) << __func__
<< " ack aq events." << dendl
;
403 ibv_ack_cq_events(cq
, MAX_ACK_EVENT
);
404 cq_events_that_need_ack
= 0;
411 Infiniband::CompletionQueue::~CompletionQueue()
414 int r
= ibv_destroy_cq(cq
);
416 lderr(cct
) << __func__
<< " failed to destroy cq: " << cpp_strerror(errno
) << dendl
;
421 int Infiniband::CompletionQueue::init()
423 cq
= ibv_create_cq(infiniband
.device
->ctxt
, queue_depth
, this, channel
->get_channel(), 0);
425 lderr(cct
) << __func__
<< " failed to create receive completion queue: "
426 << cpp_strerror(errno
) << dendl
;
430 if (ibv_req_notify_cq(cq
, 0)) {
431 lderr(cct
) << __func__
<< " ibv_req_notify_cq failed: " << cpp_strerror(errno
) << dendl
;
437 channel
->bind_cq(cq
);
438 ldout(cct
, 20) << __func__
<< " successfully create cq=" << cq
<< dendl
;
442 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only
)
444 ldout(cct
, 20) << __func__
<< " started." << dendl
;
445 int r
= ibv_req_notify_cq(cq
, 0);
447 lderr(cct
) << __func__
<< " failed to notify cq: " << cpp_strerror(errno
) << dendl
;
451 int Infiniband::CompletionQueue::poll_cq(int num_entries
, ibv_wc
*ret_wc_array
) {
452 int r
= ibv_poll_cq(cq
, num_entries
, ret_wc_array
);
454 lderr(cct
) << __func__
<< " poll_completion_queue occur met error: "
455 << cpp_strerror(errno
) << dendl
;
462 Infiniband::ProtectionDomain::ProtectionDomain(CephContext
*cct
, Device
*device
)
463 : pd(ibv_alloc_pd(device
->ctxt
))
466 lderr(cct
) << __func__
<< " failed to allocate infiniband protection domain: " << cpp_strerror(errno
) << dendl
;
471 Infiniband::ProtectionDomain::~ProtectionDomain()
477 Infiniband::MemoryManager::Chunk::Chunk(ibv_mr
* m
, uint32_t len
, char* b
)
478 : mr(m
), bytes(len
), offset(0), buffer(b
)
482 Infiniband::MemoryManager::Chunk::~Chunk()
486 void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o
)
491 uint32_t Infiniband::MemoryManager::Chunk::get_offset()
496 void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b
)
501 void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b
)
507 uint32_t Infiniband::MemoryManager::Chunk::get_bound()
512 uint32_t Infiniband::MemoryManager::Chunk::read(char* buf
, uint32_t len
)
514 uint32_t left
= bound
- offset
;
516 memcpy(buf
, buffer
+offset
, len
);
520 memcpy(buf
, buffer
+offset
, left
);
527 uint32_t Infiniband::MemoryManager::Chunk::write(char* buf
, uint32_t len
)
529 uint32_t left
= bytes
- offset
;
531 memcpy(buffer
+offset
, buf
, len
);
535 memcpy(buffer
+offset
, buf
, left
);
541 bool Infiniband::MemoryManager::Chunk::full()
543 return offset
== bytes
;
546 bool Infiniband::MemoryManager::Chunk::over()
548 return Infiniband::MemoryManager::Chunk::offset
== bound
;
551 void Infiniband::MemoryManager::Chunk::clear()
557 void Infiniband::MemoryManager::Chunk::post_srq(Infiniband
*ib
)
559 ib
->post_chunk(this);
562 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager
& m
, uint32_t s
)
563 : manager(m
), buffer_size(s
), lock("cluster_lock")
567 Infiniband::MemoryManager::Cluster::~Cluster()
569 int r
= ibv_dereg_mr(chunk_base
->mr
);
571 const auto chunk_end
= chunk_base
+ num_chunk
;
572 for (auto chunk
= chunk_base
; chunk
!= chunk_end
; chunk
++) {
577 if (manager
.enabled_huge_page
)
578 manager
.free_huge_pages(base
);
583 int Infiniband::MemoryManager::Cluster::fill(uint32_t num
)
587 uint32_t bytes
= buffer_size
* num
;
588 if (manager
.enabled_huge_page
) {
589 base
= (char*)manager
.malloc_huge_pages(bytes
);
591 base
= (char*)memalign(CEPH_PAGE_SIZE
, bytes
);
595 chunk_base
= static_cast<Chunk
*>(::malloc(sizeof(Chunk
) * num
));
596 memset(chunk_base
, 0, sizeof(Chunk
) * num
);
597 free_chunks
.reserve(num
);
598 ibv_mr
* m
= ibv_reg_mr(manager
.pd
->pd
, base
, bytes
, IBV_ACCESS_REMOTE_WRITE
| IBV_ACCESS_LOCAL_WRITE
);
600 Chunk
* chunk
= chunk_base
;
601 for (uint32_t offset
= 0; offset
< bytes
; offset
+= buffer_size
){
602 new(chunk
) Chunk(m
, buffer_size
, base
+offset
);
603 free_chunks
.push_back(chunk
);
609 void Infiniband::MemoryManager::Cluster::take_back(std::vector
<Chunk
*> &ck
)
611 Mutex::Locker
l(lock
);
614 free_chunks
.push_back(c
);
618 int Infiniband::MemoryManager::Cluster::get_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
)
620 uint32_t num
= bytes
/ buffer_size
+ 1;
621 if (bytes
% buffer_size
== 0)
624 Mutex::Locker
l(lock
);
625 if (free_chunks
.empty())
628 r
= free_chunks
.size();
629 for (auto c
: free_chunks
)
634 if (free_chunks
.size() < num
) {
635 num
= free_chunks
.size();
638 for (uint32_t i
= 0; i
< num
; ++i
) {
639 chunks
.push_back(free_chunks
.back());
640 free_chunks
.pop_back();
646 Infiniband::MemoryManager::MemoryManager(Device
*d
, ProtectionDomain
*p
, bool hugepage
)
649 enabled_huge_page
= hugepage
;
652 Infiniband::MemoryManager::~MemoryManager()
660 void* Infiniband::MemoryManager::malloc_huge_pages(size_t size
)
662 size_t real_size
= ALIGN_TO_PAGE_SIZE(size
+ HUGE_PAGE_SIZE
);
663 char *ptr
= (char *)mmap(NULL
, real_size
, PROT_READ
| PROT_WRITE
,MAP_PRIVATE
| MAP_ANONYMOUS
|MAP_POPULATE
| MAP_HUGETLB
,-1, 0);
664 if (ptr
== MAP_FAILED
) {
665 ptr
= (char *)malloc(real_size
);
666 if (ptr
== NULL
) return NULL
;
669 *((size_t *)ptr
) = real_size
;
670 return ptr
+ HUGE_PAGE_SIZE
;
673 void Infiniband::MemoryManager::free_huge_pages(void *ptr
)
675 if (ptr
== NULL
) return;
676 void *real_ptr
= (char *)ptr
-HUGE_PAGE_SIZE
;
677 size_t real_size
= *((size_t *)real_ptr
);
678 assert(real_size
% HUGE_PAGE_SIZE
== 0);
680 munmap(real_ptr
, real_size
);
685 void Infiniband::MemoryManager::register_rx_tx(uint32_t size
, uint32_t rx_num
, uint32_t tx_num
)
689 channel
= new Cluster(*this, size
);
690 channel
->fill(rx_num
);
692 send
= new Cluster(*this, size
);
696 void Infiniband::MemoryManager::return_tx(std::vector
<Chunk
*> &chunks
)
698 send
->take_back(chunks
);
701 int Infiniband::MemoryManager::get_send_buffers(std::vector
<Chunk
*> &c
, size_t bytes
)
703 return send
->get_buffers(c
, bytes
);
706 int Infiniband::MemoryManager::get_channel_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
)
708 return channel
->get_buffers(chunks
, bytes
);
712 Infiniband::Infiniband(CephContext
*cct
, const std::string
&device_name
, uint8_t port_num
)
713 : cct(cct
), lock("IB lock"), device_name(device_name
), port_num(port_num
)
717 void Infiniband::init()
719 Mutex::Locker
l(lock
);
724 device_list
= new DeviceList(cct
);
727 device
= device_list
->get_device(device_name
.c_str());
728 device
->binding_port(cct
, port_num
);
730 ib_physical_port
= device
->active_port
->get_port_num();
731 pd
= new ProtectionDomain(cct
, device
);
732 assert(NetHandler(cct
).set_nonblock(device
->ctxt
->async_fd
) == 0);
734 max_recv_wr
= device
->device_attr
->max_srq_wr
;
735 if (max_recv_wr
> cct
->_conf
->ms_async_rdma_receive_buffers
) {
736 max_recv_wr
= cct
->_conf
->ms_async_rdma_receive_buffers
;
737 ldout(cct
, 1) << __func__
<< " assigning: " << max_recv_wr
<< " receive buffers" << dendl
;
739 ldout(cct
, 1) << __func__
<< " using the max allowed receive buffers: " << max_recv_wr
<< dendl
;
742 max_send_wr
= device
->device_attr
->max_qp_wr
;
743 if (max_send_wr
> cct
->_conf
->ms_async_rdma_send_buffers
) {
744 max_send_wr
= cct
->_conf
->ms_async_rdma_send_buffers
;
745 ldout(cct
, 1) << __func__
<< " assigning: " << max_send_wr
<< " send buffers" << dendl
;
747 ldout(cct
, 1) << __func__
<< " using the max allowed send buffers: " << max_send_wr
<< dendl
;
750 ldout(cct
, 1) << __func__
<< " device allow " << device
->device_attr
->max_cqe
751 << " completion entries" << dendl
;
753 memory_manager
= new MemoryManager(device
, pd
,
754 cct
->_conf
->ms_async_rdma_enable_hugepage
);
755 memory_manager
->register_rx_tx(
756 cct
->_conf
->ms_async_rdma_buffer_size
, max_recv_wr
, max_send_wr
);
758 srq
= create_shared_receive_queue(max_recv_wr
, MAX_SHARED_RX_SGE_COUNT
);
759 post_channel_cluster();
761 dispatcher
->polling_start();
764 Infiniband::~Infiniband()
770 dispatcher
->polling_stop();
772 ibv_destroy_srq(srq
);
773 delete memory_manager
;
777 void Infiniband::set_dispatcher(RDMADispatcher
*d
)
779 assert(!d
^ !dispatcher
);
785 * Create a shared receive queue. This basically wraps the verbs call.
788 * The max number of outstanding work requests in the SRQ.
790 * The max number of scatter elements per WR.
792 * A valid ibv_srq pointer, or NULL on error.
794 ibv_srq
* Infiniband::create_shared_receive_queue(uint32_t max_wr
, uint32_t max_sge
)
796 ibv_srq_init_attr sia
;
797 memset(&sia
, 0, sizeof(sia
));
798 sia
.srq_context
= device
->ctxt
;
799 sia
.attr
.max_wr
= max_wr
;
800 sia
.attr
.max_sge
= max_sge
;
801 return ibv_create_srq(pd
->pd
, &sia
);
804 int Infiniband::get_tx_buffers(std::vector
<Chunk
*> &c
, size_t bytes
)
806 return memory_manager
->get_send_buffers(c
, bytes
);
810 * Create a new QueuePair. This factory should be used in preference to
811 * the QueuePair constructor directly, since this lets derivatives of
812 * Infiniband, e.g. MockInfiniband (if it existed),
813 * return mocked out QueuePair derivatives.
816 * QueuePair on success or NULL if init fails
817 * See QueuePair::QueuePair for parameter documentation.
819 Infiniband::QueuePair
* Infiniband::create_queue_pair(CephContext
*cct
, CompletionQueue
*tx
, CompletionQueue
* rx
, ibv_qp_type type
)
821 Infiniband::QueuePair
*qp
= new QueuePair(
822 cct
, *this, type
, ib_physical_port
, srq
, tx
, rx
, max_send_wr
, max_recv_wr
);
830 int Infiniband::post_chunk(Chunk
* chunk
)
833 isge
.addr
= reinterpret_cast<uint64_t>(chunk
->buffer
);
834 isge
.length
= chunk
->bytes
;
835 isge
.lkey
= chunk
->mr
->lkey
;
836 ibv_recv_wr rx_work_request
;
838 memset(&rx_work_request
, 0, sizeof(rx_work_request
));
839 rx_work_request
.wr_id
= reinterpret_cast<uint64_t>(chunk
);// stash descriptor ptr
840 rx_work_request
.next
= NULL
;
841 rx_work_request
.sg_list
= &isge
;
842 rx_work_request
.num_sge
= 1;
844 ibv_recv_wr
*badWorkRequest
;
845 int ret
= ibv_post_srq_recv(srq
, &rx_work_request
, &badWorkRequest
);
851 int Infiniband::post_channel_cluster()
853 vector
<Chunk
*> free_chunks
;
854 int r
= memory_manager
->get_channel_buffers(free_chunks
, 0);
856 for (vector
<Chunk
*>::iterator iter
= free_chunks
.begin(); iter
!= free_chunks
.end(); ++iter
) {
857 r
= post_chunk(*iter
);
863 Infiniband::CompletionChannel
* Infiniband::create_comp_channel(CephContext
*c
)
865 Infiniband::CompletionChannel
*cc
= new Infiniband::CompletionChannel(c
, *this);
873 Infiniband::CompletionQueue
* Infiniband::create_comp_queue(
874 CephContext
*cct
, CompletionChannel
*cc
)
876 Infiniband::CompletionQueue
*cq
= new Infiniband::CompletionQueue(
877 cct
, *this, CQ_DEPTH
, cc
);
885 // 1 means no valid buffer read, 0 means got enough buffer
886 // else return < 0 means error
887 int Infiniband::recv_msg(CephContext
*cct
, int sd
, IBSYNMsg
& im
)
889 char msg
[TCP_MSG_LEN
];
891 ssize_t r
= ::read(sd
, &msg
, sizeof(msg
));
893 if (cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
894 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
895 ldout(cct
, 0) << __func__
<< " injecting socket failure" << dendl
;
901 lderr(cct
) << __func__
<< " got error " << r
<< ": "
902 << cpp_strerror(r
) << dendl
;
903 } else if (r
== 0) { // valid disconnect message of length 0
904 ldout(cct
, 10) << __func__
<< " got disconnect message " << dendl
;
905 } else if ((size_t)r
!= sizeof(msg
)) { // invalid message
906 ldout(cct
, 1) << __func__
<< " got bad length (" << r
<< ") " << dendl
;
908 } else { // valid message
909 sscanf(msg
, "%hu:%x:%x:%x:%s", &(im
.lid
), &(im
.qpn
), &(im
.psn
), &(im
.peer_qpn
),gid
);
910 wire_gid_to_gid(gid
, &(im
.gid
));
911 ldout(cct
, 5) << __func__
<< " recevd: " << im
.lid
<< ", " << im
.qpn
<< ", " << im
.psn
<< ", " << im
.peer_qpn
<< ", " << gid
<< dendl
;
916 int Infiniband::send_msg(CephContext
*cct
, int sd
, IBSYNMsg
& im
)
921 char msg
[TCP_MSG_LEN
];
924 gid_to_wire_gid(&(im
.gid
), gid
);
925 sprintf(msg
, "%04x:%08x:%08x:%08x:%s", im
.lid
, im
.qpn
, im
.psn
, im
.peer_qpn
, gid
);
926 ldout(cct
, 10) << __func__
<< " sending: " << im
.lid
<< ", " << im
.qpn
<< ", " << im
.psn
927 << ", " << im
.peer_qpn
<< ", " << gid
<< dendl
;
928 r
= ::write(sd
, msg
, sizeof(msg
));
930 if (cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
931 if (rand() % cct
->_conf
->ms_inject_socket_failures
== 0) {
932 ldout(cct
, 0) << __func__
<< " injecting socket failure" << dendl
;
937 if ((size_t)r
!= sizeof(msg
)) {
938 // FIXME need to handle EAGAIN instead of retry
939 if (r
< 0 && (errno
== EINTR
|| errno
== EAGAIN
) && retry
< 3) {
944 lderr(cct
) << __func__
<< " send returned error " << errno
<< ": "
945 << cpp_strerror(errno
) << dendl
;
947 lderr(cct
) << __func__
<< " send got bad length (" << r
<< ") " << cpp_strerror(errno
) << dendl
;
953 void Infiniband::wire_gid_to_gid(const char *wgid
, union ibv_gid
*gid
)
959 for (tmp
[8] = 0, i
= 0; i
< 4; ++i
) {
960 memcpy(tmp
, wgid
+ i
* 8, 8);
961 sscanf(tmp
, "%x", &v32
);
962 *(uint32_t *)(&gid
->raw
[i
* 4]) = ntohl(v32
);
966 void Infiniband::gid_to_wire_gid(const union ibv_gid
*gid
, char wgid
[])
968 for (int i
= 0; i
< 4; ++i
)
969 sprintf(&wgid
[i
* 8], "%08x", htonl(*(uint32_t *)(gid
->raw
+ i
* 4)));
972 Infiniband::QueuePair::~QueuePair()
975 ldout(cct
, 20) << __func__
<< " destroy qp=" << qp
<< dendl
;
976 assert(!ibv_destroy_qp(qp
));
981 * Given a string representation of the `status' field from Verbs
985 * The integer status obtained in ibv_wc.status.
987 * A string corresponding to the given status.
989 const char* Infiniband::wc_status_to_string(int status
)
991 static const char *lookup
[] = {
1005 "RNR_RETRY_EXC_ERR",
1007 "REM_INV_RD_REQ_ERR",
1010 "INV_EEC_STATE_ERR",
1016 if (status
< IBV_WC_SUCCESS
|| status
> IBV_WC_GENERAL_ERR
)
1017 return "<status out of range!>";
1018 return lookup
[status
];
1021 const char* Infiniband::qp_state_string(int status
) {
1023 case IBV_QPS_RESET
: return "IBV_QPS_RESET";
1024 case IBV_QPS_INIT
: return "IBV_QPS_INIT";
1025 case IBV_QPS_RTR
: return "IBV_QPS_RTR";
1026 case IBV_QPS_RTS
: return "IBV_QPS_RTS";
1027 case IBV_QPS_SQD
: return "IBV_QPS_SQD";
1028 case IBV_QPS_SQE
: return "IBV_QPS_SQE";
1029 case IBV_QPS_ERR
: return "IBV_QPS_ERR";
1030 default: return " out of range.";