]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/Infiniband.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "Infiniband.h"
18 #include "RDMAStack.h"
21 #include "common/errno.h"
22 #include "common/debug.h"
24 #define dout_subsys ceph_subsys_ms
26 #define dout_prefix *_dout << "Infiniband "
28 static const uint32_t MAX_INLINE_DATA
= 0;
30 Infiniband::QueuePair::QueuePair(
31 CephContext
*c
, Device
&device
, ibv_qp_type type
,
32 int port
, ibv_srq
*srq
,
33 Infiniband::CompletionQueue
* txcq
, Infiniband::CompletionQueue
* rxcq
,
34 uint32_t max_send_wr
, uint32_t max_recv_wr
, uint32_t q_key
)
35 : cct(c
), ibdev(device
),
38 ib_physical_port(port
),
45 max_send_wr(max_send_wr
),
46 max_recv_wr(max_recv_wr
),
50 initial_psn
= lrand48() & 0xffffff;
51 if (type
!= IBV_QPT_RC
&& type
!= IBV_QPT_UD
&& type
!= IBV_QPT_RAW_PACKET
) {
52 lderr(cct
) << __func__
<< " invalid queue pair type" << cpp_strerror(errno
) << dendl
;
57 int Infiniband::QueuePair::init()
59 ldout(cct
, 20) << __func__
<< " started." << dendl
;
60 ibv_qp_init_attr qpia
;
61 memset(&qpia
, 0, sizeof(qpia
));
62 qpia
.send_cq
= txcq
->get_cq();
63 qpia
.recv_cq
= rxcq
->get_cq();
64 qpia
.srq
= srq
; // use the same shared receive queue
65 qpia
.cap
.max_send_wr
= max_send_wr
; // max outstanding send requests
66 qpia
.cap
.max_send_sge
= 1; // max send scatter-gather elements
67 qpia
.cap
.max_inline_data
= MAX_INLINE_DATA
; // max bytes of immediate data on send q
68 qpia
.qp_type
= type
; // RC, UC, UD, or XRC
69 qpia
.sq_sig_all
= 0; // only generate CQEs on requested WQEs
71 qp
= ibv_create_qp(pd
, &qpia
);
73 lderr(cct
) << __func__
<< " failed to create queue pair" << cpp_strerror(errno
) << dendl
;
74 if (errno
== ENOMEM
) {
75 lderr(cct
) << __func__
<< " try reducing ms_async_rdma_receive_buffers, "
76 " ms_async_rdma_send_buffers or"
77 " ms_async_rdma_buffer_size" << dendl
;
82 ldout(cct
, 20) << __func__
<< " successfully create queue pair: "
83 << "qp=" << qp
<< dendl
;
85 // move from RESET to INIT state
87 memset(&qpa
, 0, sizeof(qpa
));
88 qpa
.qp_state
= IBV_QPS_INIT
;
90 qpa
.port_num
= (uint8_t)(ib_physical_port
);
91 qpa
.qp_access_flags
= IBV_ACCESS_REMOTE_WRITE
| IBV_ACCESS_LOCAL_WRITE
;
94 int mask
= IBV_QP_STATE
| IBV_QP_PORT
;
97 mask
|= IBV_QP_ACCESS_FLAGS
;
98 mask
|= IBV_QP_PKEY_INDEX
;
102 mask
|= IBV_QP_PKEY_INDEX
;
104 case IBV_QPT_RAW_PACKET
:
110 int ret
= ibv_modify_qp(qp
, &qpa
, mask
);
113 lderr(cct
) << __func__
<< " failed to transition to INIT state: "
114 << cpp_strerror(errno
) << dendl
;
117 ldout(cct
, 20) << __func__
<< " successfully change queue pair to INIT:"
118 << " qp=" << qp
<< dendl
;
123 * Change RC QueuePair into the ERROR state. This is necessary modify
124 * the Queue Pair into the Error state and poll all of the relevant
125 * Work Completions prior to destroying a Queue Pair.
126 * Since destroying a Queue Pair does not guarantee that its Work
127 * Completions are removed from the CQ upon destruction. Even if the
128 * Work Completions are already in the CQ, it might not be possible to
129 * retrieve them. If the Queue Pair is associated with an SRQ, it is
130 * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
133 * -errno if the QueuePair can't switch to ERROR
136 int Infiniband::QueuePair::to_dead()
141 memset(&qpa
, 0, sizeof(qpa
));
142 qpa
.qp_state
= IBV_QPS_ERR
;
144 int mask
= IBV_QP_STATE
;
145 int ret
= ibv_modify_qp(qp
, &qpa
, mask
);
147 lderr(cct
) << __func__
<< " failed to transition to ERROR state: "
148 << cpp_strerror(errno
) << dendl
;
155 int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp
) const
158 ibv_qp_init_attr qpia
;
160 int r
= ibv_query_qp(qp
, &qpa
, IBV_QP_DEST_QPN
, &qpia
);
162 lderr(cct
) << __func__
<< " failed to query qp: "
163 << cpp_strerror(errno
) << dendl
;
168 *rqp
= qpa
.dest_qp_num
;
173 * Get the remote infiniband address for this QueuePair, as set in #plumb().
174 * LIDs are "local IDs" in infiniband terminology. They are short, locally
175 * routable addresses.
177 int Infiniband::QueuePair::get_remote_lid(uint16_t *lid
) const
180 ibv_qp_init_attr qpia
;
182 int r
= ibv_query_qp(qp
, &qpa
, IBV_QP_AV
, &qpia
);
184 lderr(cct
) << __func__
<< " failed to query qp: "
185 << cpp_strerror(errno
) << dendl
;
190 *lid
= qpa
.ah_attr
.dlid
;
195 * Get the state of a QueuePair.
197 int Infiniband::QueuePair::get_state() const
200 ibv_qp_init_attr qpia
;
202 int r
= ibv_query_qp(qp
, &qpa
, IBV_QP_STATE
, &qpia
);
204 lderr(cct
) << __func__
<< " failed to get state: "
205 << cpp_strerror(errno
) << dendl
;
212 * Return true if the queue pair is in an error state, false otherwise.
214 bool Infiniband::QueuePair::is_error() const
217 ibv_qp_init_attr qpia
;
219 int r
= ibv_query_qp(qp
, &qpa
, -1, &qpia
);
221 lderr(cct
) << __func__
<< " failed to get state: "
222 << cpp_strerror(errno
) << dendl
;
225 return qpa
.cur_qp_state
== IBV_QPS_ERR
;
229 Infiniband::CompletionChannel::CompletionChannel(CephContext
*c
, Device
&ibdev
)
230 : cct(c
), ibdev(ibdev
), channel(NULL
), cq(NULL
), cq_events_that_need_ack(0)
234 Infiniband::CompletionChannel::~CompletionChannel()
237 int r
= ibv_destroy_comp_channel(channel
);
239 lderr(cct
) << __func__
<< " failed to destroy cc: " << cpp_strerror(errno
) << dendl
;
244 int Infiniband::CompletionChannel::init()
246 ldout(cct
, 20) << __func__
<< " started." << dendl
;
247 channel
= ibv_create_comp_channel(ibdev
.ctxt
);
249 lderr(cct
) << __func__
<< " failed to create receive completion channel: "
250 << cpp_strerror(errno
) << dendl
;
253 int rc
= NetHandler(cct
).set_nonblock(channel
->fd
);
255 ibv_destroy_comp_channel(channel
);
261 void Infiniband::CompletionChannel::ack_events()
263 ibv_ack_cq_events(cq
, cq_events_that_need_ack
);
264 cq_events_that_need_ack
= 0;
267 bool Infiniband::CompletionChannel::get_cq_event()
271 if (ibv_get_cq_event(channel
, &cq
, &ev_ctx
)) {
272 if (errno
!= EAGAIN
&& errno
!= EINTR
)
273 lderr(cct
) << __func__
<< " failed to retrieve CQ event: "
274 << cpp_strerror(errno
) << dendl
;
278 /* accumulate number of cq events that need to
279 * * be acked, and periodically ack them
281 if (++cq_events_that_need_ack
== MAX_ACK_EVENT
) {
282 ldout(cct
, 20) << __func__
<< " ack aq events." << dendl
;
283 ibv_ack_cq_events(cq
, MAX_ACK_EVENT
);
284 cq_events_that_need_ack
= 0;
291 Infiniband::CompletionQueue::~CompletionQueue()
294 int r
= ibv_destroy_cq(cq
);
296 lderr(cct
) << __func__
<< " failed to destroy cq: " << cpp_strerror(errno
) << dendl
;
301 int Infiniband::CompletionQueue::init()
303 cq
= ibv_create_cq(ibdev
.ctxt
, queue_depth
, this, channel
->get_channel(), 0);
305 lderr(cct
) << __func__
<< " failed to create receive completion queue: "
306 << cpp_strerror(errno
) << dendl
;
310 if (ibv_req_notify_cq(cq
, 0)) {
311 lderr(cct
) << __func__
<< " ibv_req_notify_cq failed: " << cpp_strerror(errno
) << dendl
;
317 channel
->bind_cq(cq
);
318 ldout(cct
, 20) << __func__
<< " successfully create cq=" << cq
<< dendl
;
322 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only
)
324 ldout(cct
, 20) << __func__
<< " started." << dendl
;
325 int r
= ibv_req_notify_cq(cq
, 0);
327 lderr(cct
) << __func__
<< " failed to notify cq: " << cpp_strerror(errno
) << dendl
;
331 int Infiniband::CompletionQueue::poll_cq(int num_entries
, ibv_wc
*ret_wc_array
) {
332 int r
= ibv_poll_cq(cq
, num_entries
, ret_wc_array
);
334 lderr(cct
) << __func__
<< " poll_completion_queue occur met error: "
335 << cpp_strerror(errno
) << dendl
;
342 Infiniband::ProtectionDomain::ProtectionDomain(CephContext
*cct
, Device
*device
)
343 : pd(ibv_alloc_pd(device
->ctxt
))
346 lderr(cct
) << __func__
<< " failed to allocate infiniband protection domain: " << cpp_strerror(errno
) << dendl
;
351 Infiniband::ProtectionDomain::~ProtectionDomain()
353 int rc
= ibv_dealloc_pd(pd
);
358 Infiniband::MemoryManager::Chunk::Chunk(ibv_mr
* m
, uint32_t len
, char* b
)
359 : mr(m
), bytes(len
), offset(0), buffer(b
)
363 Infiniband::MemoryManager::Chunk::~Chunk()
365 assert(ibv_dereg_mr(mr
) == 0);
368 void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o
)
373 uint32_t Infiniband::MemoryManager::Chunk::get_offset()
378 void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b
)
383 void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b
)
389 uint32_t Infiniband::MemoryManager::Chunk::get_bound()
394 uint32_t Infiniband::MemoryManager::Chunk::read(char* buf
, uint32_t len
)
396 uint32_t left
= bound
- offset
;
398 memcpy(buf
, buffer
+offset
, len
);
402 memcpy(buf
, buffer
+offset
, left
);
409 uint32_t Infiniband::MemoryManager::Chunk::write(char* buf
, uint32_t len
)
411 uint32_t left
= bytes
- offset
;
413 memcpy(buffer
+offset
, buf
, len
);
417 memcpy(buffer
+offset
, buf
, left
);
423 bool Infiniband::MemoryManager::Chunk::full()
425 return offset
== bytes
;
428 bool Infiniband::MemoryManager::Chunk::over()
430 return Infiniband::MemoryManager::Chunk::offset
== bound
;
433 void Infiniband::MemoryManager::Chunk::clear()
439 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager
& m
, uint32_t s
)
440 : manager(m
), buffer_size(s
), lock("cluster_lock")
444 Infiniband::MemoryManager::Cluster::~Cluster()
446 const auto chunk_end
= chunk_base
+ num_chunk
;
447 for (auto chunk
= chunk_base
; chunk
!= chunk_end
; chunk
++) {
452 if (manager
.enabled_huge_page
)
453 manager
.free_huge_pages(base
);
458 int Infiniband::MemoryManager::Cluster::fill(uint32_t num
)
462 uint32_t bytes
= buffer_size
* num
;
463 if (manager
.enabled_huge_page
) {
464 base
= (char*)manager
.malloc_huge_pages(bytes
);
466 base
= (char*)memalign(CEPH_PAGE_SIZE
, bytes
);
470 chunk_base
= static_cast<Chunk
*>(::malloc(sizeof(Chunk
) * num
));
471 memset(chunk_base
, 0, sizeof(Chunk
) * num
);
472 free_chunks
.reserve(num
);
473 Chunk
* chunk
= chunk_base
;
474 for (uint32_t offset
= 0; offset
< bytes
; offset
+= buffer_size
){
475 ibv_mr
* m
= ibv_reg_mr(manager
.pd
->pd
, base
+offset
, buffer_size
, IBV_ACCESS_REMOTE_WRITE
| IBV_ACCESS_LOCAL_WRITE
);
477 new(chunk
) Chunk(m
, buffer_size
, base
+offset
);
478 free_chunks
.push_back(chunk
);
484 void Infiniband::MemoryManager::Cluster::take_back(std::vector
<Chunk
*> &ck
)
486 Mutex::Locker
l(lock
);
489 free_chunks
.push_back(c
);
493 int Infiniband::MemoryManager::Cluster::get_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
)
495 uint32_t num
= bytes
/ buffer_size
+ 1;
496 if (bytes
% buffer_size
== 0)
499 Mutex::Locker
l(lock
);
500 if (free_chunks
.empty())
503 r
= free_chunks
.size();
504 for (auto c
: free_chunks
)
509 if (free_chunks
.size() < num
) {
510 num
= free_chunks
.size();
513 for (uint32_t i
= 0; i
< num
; ++i
) {
514 chunks
.push_back(free_chunks
.back());
515 free_chunks
.pop_back();
521 Infiniband::MemoryManager::MemoryManager(Device
*d
, ProtectionDomain
*p
, bool hugepage
)
524 enabled_huge_page
= hugepage
;
527 Infiniband::MemoryManager::~MemoryManager()
535 void* Infiniband::MemoryManager::malloc_huge_pages(size_t size
)
537 size_t real_size
= ALIGN_TO_PAGE_SIZE(size
+ HUGE_PAGE_SIZE
);
538 char *ptr
= (char *)mmap(NULL
, real_size
, PROT_READ
| PROT_WRITE
,MAP_PRIVATE
| MAP_ANONYMOUS
|MAP_POPULATE
| MAP_HUGETLB
,-1, 0);
539 if (ptr
== MAP_FAILED
) {
540 ptr
= (char *)malloc(real_size
);
541 if (ptr
== NULL
) return NULL
;
544 *((size_t *)ptr
) = real_size
;
545 return ptr
+ HUGE_PAGE_SIZE
;
548 void Infiniband::MemoryManager::free_huge_pages(void *ptr
)
550 if (ptr
== NULL
) return;
551 void *real_ptr
= (char *)ptr
-HUGE_PAGE_SIZE
;
552 size_t real_size
= *((size_t *)real_ptr
);
553 assert(real_size
% HUGE_PAGE_SIZE
== 0);
555 munmap(real_ptr
, real_size
);
560 void Infiniband::MemoryManager::register_rx_tx(uint32_t size
, uint32_t rx_num
, uint32_t tx_num
)
564 channel
= new Cluster(*this, size
);
565 channel
->fill(rx_num
);
567 send
= new Cluster(*this, size
);
571 void Infiniband::MemoryManager::return_tx(std::vector
<Chunk
*> &chunks
)
573 send
->take_back(chunks
);
576 int Infiniband::MemoryManager::get_send_buffers(std::vector
<Chunk
*> &c
, size_t bytes
)
578 return send
->get_buffers(c
, bytes
);
581 int Infiniband::MemoryManager::get_channel_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
)
583 return channel
->get_buffers(chunks
, bytes
);
587 Infiniband::Infiniband(CephContext
*cct
)
588 : cct(cct
), lock("IB lock")
592 Infiniband::~Infiniband()
595 dispatcher
->polling_stop();
600 void Infiniband::init()
602 Mutex::Locker
l(lock
);
607 device_list
= new DeviceList(cct
, this);
610 dispatcher
->polling_start();
613 void Infiniband::set_dispatcher(RDMADispatcher
*d
)
615 assert(!d
^ !dispatcher
);
620 Device
* Infiniband::get_device(const char* device_name
)
622 return device_list
->get_device(device_name
);
625 Device
*Infiniband::get_device(const struct ibv_context
*ctxt
)
627 return device_list
->get_device(ctxt
);
630 Infiniband::QueuePair::~QueuePair()
633 ldout(cct
, 20) << __func__
<< " destroy qp=" << qp
<< dendl
;
634 assert(!ibv_destroy_qp(qp
));
639 * Given a string representation of the `status' field from Verbs
643 * The integer status obtained in ibv_wc.status.
645 * A string corresponding to the given status.
647 const char* Infiniband::wc_status_to_string(int status
)
649 static const char *lookup
[] = {
665 "REM_INV_RD_REQ_ERR",
674 if (status
< IBV_WC_SUCCESS
|| status
> IBV_WC_GENERAL_ERR
)
675 return "<status out of range!>";
676 return lookup
[status
];
679 const char* Infiniband::qp_state_string(int status
) {
681 case IBV_QPS_RESET
: return "IBV_QPS_RESET";
682 case IBV_QPS_INIT
: return "IBV_QPS_INIT";
683 case IBV_QPS_RTR
: return "IBV_QPS_RTR";
684 case IBV_QPS_RTS
: return "IBV_QPS_RTS";
685 case IBV_QPS_SQD
: return "IBV_QPS_SQD";
686 case IBV_QPS_SQE
: return "IBV_QPS_SQE";
687 case IBV_QPS_ERR
: return "IBV_QPS_ERR";
688 default: return " out of range.";
692 void Infiniband::handle_pre_fork()
694 device_list
->uninit();
697 int Infiniband::poll_tx(int n
, Device
**d
, ibv_wc
*wc
)
699 return device_list
->poll_tx(n
, d
, wc
);
702 int Infiniband::poll_rx(int n
, Device
**d
, ibv_wc
*wc
)
704 return device_list
->poll_rx(n
, d
, wc
);
707 int Infiniband::poll_blocking(bool &done
)
709 return device_list
->poll_blocking(done
);
712 void Infiniband::rearm_notify()
714 device_list
->rearm_notify();
717 void Infiniband::handle_async_event()
719 device_list
->handle_async_event();