1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_INFINIBAND_H
18 #define CEPH_INFINIBAND_H
20 #include <boost/pool/pool.hpp>
21 // need this because boost messes with ceph log/assert definitions
22 #include "include/ceph_assert.h"
24 #include <infiniband/verbs.h>
25 #include <rdma/rdma_cma.h>
32 #include "include/common_fwd.h"
33 #include "include/int_types.h"
34 #include "include/page.h"
35 #include "include/scope_guard.h"
36 #include "common/debug.h"
37 #include "common/errno.h"
38 #include "common/ceph_mutex.h"
39 #include "common/perf_counters.h"
40 #include "msg/msg_types.h"
41 #include "msg/async/net_handler.h"
43 #define HUGE_PAGE_SIZE_2MB (2 * 1024 * 1024)
44 #define ALIGN_TO_PAGE_2MB(x) \
45 (((x) + (HUGE_PAGE_SIZE_2MB - 1)) & ~(HUGE_PAGE_SIZE_2MB - 1))
48 #define PSN_MSK ((1 << PSN_LEN) - 1)
50 #define BEACON_WRID 0xDEADBEEF
58 } __attribute__((packed
));
63 struct ibv_context
* ctxt
;
65 struct ibv_port_attr port_attr
;
71 explicit Port(CephContext
*cct
, struct ibv_context
* ictxt
, uint8_t ipn
);
72 uint16_t get_lid() { return lid
; }
73 ibv_gid
get_gid() { return gid
; }
74 int get_port_num() { return port_num
; }
75 ibv_port_attr
* get_port_attr() { return &port_attr
; }
76 int get_gid_idx() { return gid_idx
; }
85 explicit Device(CephContext
*c
, ibv_device
* ib_dev
);
86 explicit Device(CephContext
*c
, ibv_context
*ib_ctx
);
90 ceph_assert(ibv_close_device(ctxt
) == 0);
93 const char* get_name() { return name
;}
94 uint16_t get_lid() { return active_port
->get_lid(); }
95 ibv_gid
get_gid() { return active_port
->get_gid(); }
96 int get_gid_idx() { return active_port
->get_gid_idx(); }
97 void binding_port(CephContext
*c
, int port_num
);
98 struct ibv_context
*ctxt
;
99 ibv_device_attr device_attr
;
105 struct ibv_device
** device_list
;
106 struct ibv_context
** device_context_list
;
110 explicit DeviceList(CephContext
*cct
): device_list(nullptr), device_context_list(nullptr),
111 num(0), devices(nullptr) {
112 device_list
= ibv_get_device_list(&num
);
113 ceph_assert(device_list
);
115 if (cct
->_conf
->ms_async_rdma_cm
) {
116 device_context_list
= rdma_get_devices(NULL
);
117 ceph_assert(device_context_list
);
119 devices
= new Device
*[num
];
121 for (int i
= 0;i
< num
; ++i
) {
122 if (cct
->_conf
->ms_async_rdma_cm
) {
123 devices
[i
] = new Device(cct
, device_context_list
[i
]);
125 devices
[i
] = new Device(cct
, device_list
[i
]);
130 for (int i
=0; i
< num
; ++i
) {
134 ibv_free_device_list(device_list
);
135 rdma_free_devices(device_context_list
);
138 Device
* get_device(const char* device_name
) {
139 for (int i
= 0; i
< num
; ++i
) {
140 if (!strlen(device_name
) || !strcmp(device_name
, devices
[i
]->get_name())) {
150 l_msgr_rdma_dispatcher_first
= 94000,
153 l_msgr_rdma_inflight_tx_chunks
,
154 l_msgr_rdma_rx_bufs_in_use
,
155 l_msgr_rdma_rx_bufs_total
,
157 l_msgr_rdma_tx_total_wc
,
158 l_msgr_rdma_tx_total_wc_errors
,
159 l_msgr_rdma_tx_wc_retry_errors
,
160 l_msgr_rdma_tx_wc_wr_flush_errors
,
162 l_msgr_rdma_rx_total_wc
,
163 l_msgr_rdma_rx_total_wc_errors
,
166 l_msgr_rdma_handshake_errors
,
168 l_msgr_rdma_total_async_events
,
169 l_msgr_rdma_async_last_wqe_events
,
171 l_msgr_rdma_created_queue_pair
,
172 l_msgr_rdma_active_queue_pair
,
174 l_msgr_rdma_dispatcher_last
,
178 l_msgr_rdma_first
= 95000,
180 l_msgr_rdma_tx_no_mem
,
181 l_msgr_rdma_tx_parital_mem
,
182 l_msgr_rdma_tx_failed
,
184 l_msgr_rdma_tx_chunks
,
185 l_msgr_rdma_tx_bytes
,
186 l_msgr_rdma_rx_chunks
,
187 l_msgr_rdma_rx_bytes
,
188 l_msgr_rdma_pending_sent_conns
,
193 class RDMADispatcher
;
197 class ProtectionDomain
{
199 explicit ProtectionDomain(CephContext
*cct
, Device
*device
);
206 class MemoryManager
{
210 Chunk(ibv_mr
* m
, uint32_t bytes
, char* buffer
, uint32_t offset
= 0, uint32_t bound
= 0, uint32_t lkey
= 0, QueuePair
* qp
= nullptr);
213 uint32_t get_offset();
214 uint32_t get_size() const;
215 void prepare_read(uint32_t b
);
216 uint32_t get_bound();
217 uint32_t read(char* buf
, uint32_t len
);
218 uint32_t write(char* buf
, uint32_t len
);
220 void reset_read_chunk();
221 void reset_write_chunk();
222 void set_qp(QueuePair
*qp
) { this->qp
= qp
; }
223 void clear_qp() { set_qp(nullptr); }
224 QueuePair
* get_qp() { return qp
; }
233 char* buffer
; // TODO: remove buffer/refactor TX
239 Cluster(MemoryManager
& m
, uint32_t s
);
242 int fill(uint32_t num
);
243 void take_back(std::vector
<Chunk
*> &ck
);
244 int get_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
);
245 Chunk
*get_chunk_by_buffer(const char *c
) {
246 uint32_t idx
= (c
- base
) / buffer_size
;
247 Chunk
*chunk
= chunk_base
+ idx
;
250 bool is_my_buffer(const char *c
) const {
251 return c
>= base
&& c
< end
;
254 MemoryManager
& manager
;
255 uint32_t buffer_size
;
256 uint32_t num_chunk
= 0;
257 ceph::mutex lock
= ceph::make_mutex("cluster_lock");
258 std::vector
<Chunk
*> free_chunks
;
259 char *base
= nullptr;
261 Chunk
* chunk_base
= nullptr;
264 class MemPoolContext
{
265 PerfCounters
*perf_logger
;
268 MemoryManager
*manager
;
269 unsigned n_bufs_allocated
;
270 // true if it is possible to alloc
271 // more memory for the pool
272 explicit MemPoolContext(MemoryManager
*m
) :
273 perf_logger(nullptr),
275 n_bufs_allocated(0) {}
276 bool can_alloc(unsigned nbufs
);
277 void update_stats(int val
);
278 void set_stat_logger(PerfCounters
*logger
);
281 class PoolAllocator
{
289 typedef std::size_t size_type
;
290 typedef std::ptrdiff_t difference_type
;
292 static char * malloc(const size_type bytes
);
293 static void free(char * const block
);
295 template<typename Func
>
296 static std::invoke_result_t
<Func
> with_context(MemPoolContext
* ctx
,
298 std::lock_guard l
{get_lock()};
300 scope_guard reset_ctx
{[] { g_ctx
= nullptr; }};
301 return std::move(func
)();
304 static ceph::mutex
& get_lock();
305 static MemPoolContext
* g_ctx
;
309 * modify boost pool so that it is possible to
310 * have a thread safe 'context' when allocating/freeing
311 * the memory. It is needed to allow a different pool
312 * configurations and bookkeeping per CephContext and
313 * also to be able to use same allocator to deal with
315 * TODO: use boost pool to allocate TX chunks too
317 class mem_pool
: public boost::pool
<PoolAllocator
> {
323 ceph::mutex lock
= ceph::make_mutex("mem_pool_lock");
324 explicit mem_pool(MemPoolContext
*ctx
, const size_type nrequested_size
,
325 const size_type nnext_size
= 32,
326 const size_type nmax_size
= 0) :
327 pool(nrequested_size
, nnext_size
, nmax_size
),
331 if (!store().empty())
332 return (store().malloc
)();
333 // need to alloc more memory...
335 return slow_malloc();
339 MemoryManager(CephContext
*c
, Device
*d
, ProtectionDomain
*p
);
342 void* malloc(size_t size
);
343 void free(void *ptr
);
345 void create_tx_pool(uint32_t size
, uint32_t tx_num
);
346 void return_tx(std::vector
<Chunk
*> &chunks
);
347 int get_send_buffers(std::vector
<Chunk
*> &c
, size_t bytes
);
348 bool is_tx_buffer(const char* c
) { return send
->is_my_buffer(c
); }
349 Chunk
*get_tx_chunk_by_buffer(const char *c
) {
350 return send
->get_chunk_by_buffer(c
);
352 uint32_t get_tx_buffer_size() const {
353 return send
->buffer_size
;
356 Chunk
*get_rx_buffer() {
357 std::lock_guard l
{rxbuf_pool
.lock
};
358 return reinterpret_cast<Chunk
*>(rxbuf_pool
.malloc());
361 void release_rx_buffer(Chunk
*chunk
) {
362 std::lock_guard l
{rxbuf_pool
.lock
};
364 rxbuf_pool
.free(chunk
);
367 void set_rx_stat_logger(PerfCounters
*logger
) {
368 rxbuf_pool_ctx
.set_stat_logger(logger
);
373 // TODO: Cluster -> TxPool txbuf_pool
376 Cluster
* send
= nullptr;// SEND
378 ProtectionDomain
*pd
;
379 MemPoolContext rxbuf_pool_ctx
;
383 void* huge_pages_malloc(size_t size
);
384 void huge_pages_free(void *ptr
);
388 uint32_t tx_queue_len
= 0;
389 uint32_t rx_queue_len
= 0;
390 uint32_t max_sge
= 0;
391 uint8_t ib_physical_port
= 0;
392 MemoryManager
* memory_manager
= nullptr;
393 ibv_srq
* srq
= nullptr; // shared receive work queue
394 Device
*device
= NULL
;
395 ProtectionDomain
*pd
= NULL
;
396 DeviceList
*device_list
= nullptr;
398 ceph::mutex lock
= ceph::make_mutex("IB lock");
399 bool initialized
= false;
400 const std::string
&device_name
;
402 bool support_srq
= false;
405 explicit Infiniband(CephContext
*c
);
408 static void verify_prereq(CephContext
*cct
);
410 class CompletionChannel
{
411 static const uint32_t MAX_ACK_EVENT
= 5000;
413 Infiniband
& infiniband
;
414 ibv_comp_channel
*channel
;
416 uint32_t cq_events_that_need_ack
;
419 CompletionChannel(CephContext
*c
, Infiniband
&ib
);
420 ~CompletionChannel();
423 int get_fd() { return channel
->fd
; }
424 ibv_comp_channel
* get_channel() { return channel
; }
425 void bind_cq(ibv_cq
*c
) { cq
= c
; }
429 // this class encapsulates the creation, use, and destruction of an RC
432 // You need to call init and it will create a cq and associate to comp channel
433 class CompletionQueue
{
435 CompletionQueue(CephContext
*c
, Infiniband
&ib
,
436 const uint32_t qd
, CompletionChannel
*cc
)
437 : cct(c
), infiniband(ib
), channel(cc
), cq(NULL
), queue_depth(qd
) {}
440 int poll_cq(int num_entries
, ibv_wc
*ret_wc_array
);
442 ibv_cq
* get_cq() const { return cq
; }
443 int rearm_notify(bool solicited_only
=true);
444 CompletionChannel
* get_cc() const { return channel
; }
447 Infiniband
& infiniband
; // Infiniband to which this QP belongs
448 CompletionChannel
*channel
;
450 uint32_t queue_depth
;
453 // this class encapsulates the creation, use, and destruction of an RC
456 // you need call init and it will create a qp and bring it to the INIT state.
457 // after obtaining the lid, qpn, and psn of a remote queue pair, one
458 // must call plumb() to bring the queue pair to the RTS state.
461 typedef MemoryManager::Chunk Chunk
;
462 QueuePair(CephContext
*c
, Infiniband
& infiniband
, ibv_qp_type type
,
463 int ib_physical_port
, ibv_srq
*srq
,
464 Infiniband::CompletionQueue
* txcq
,
465 Infiniband::CompletionQueue
* rxcq
,
466 uint32_t tx_queue_len
, uint32_t max_recv_wr
, struct rdma_cm_id
*cid
, uint32_t q_key
= 0);
469 int modify_qp_to_error();
470 int modify_qp_to_rts();
471 int modify_qp_to_rtr();
472 int modify_qp_to_init();
476 * Get the initial packet sequence number for this QueuePair.
477 * This is randomly generated on creation. It should not be confused
478 * with the remote side's PSN, which is set in #plumb().
480 uint32_t get_initial_psn() const { return initial_psn
; };
482 * Get the local queue pair number for this QueuePair.
483 * QPNs are analogous to UDP/TCP port numbers.
485 uint32_t get_local_qp_number() const { return qp
->qp_num
; };
487 * Get the remote queue pair number for this QueuePair, as set in #plumb().
488 * QPNs are analogous to UDP/TCP port numbers.
490 int get_remote_qp_number(uint32_t *rqp
) const;
492 * Get the remote infiniband address for this QueuePair, as set in #plumb().
493 * LIDs are "local IDs" in infiniband terminology. They are short, locally
494 * routable addresses.
496 int get_remote_lid(uint16_t *lid
) const;
498 * Get the state of a QueuePair.
500 int get_state() const;
502 * send/receive connection management meta data
504 int send_cm_meta(CephContext
*cct
, int socket_fd
);
505 int recv_cm_meta(CephContext
*cct
, int socket_fd
);
506 void wire_gid_to_gid(const char *wgid
, ib_cm_meta_t
* cm_meta_data
);
507 void gid_to_wire_gid(const ib_cm_meta_t
& cm_meta_data
, char wgid
[]);
508 ibv_qp
* get_qp() const { return qp
; }
509 Infiniband::CompletionQueue
* get_tx_cq() const { return txcq
; }
510 Infiniband::CompletionQueue
* get_rx_cq() const { return rxcq
; }
512 bool is_dead() const { return dead
; }
513 ib_cm_meta_t
& get_peer_cm_meta() { return peer_cm_meta
; }
514 ib_cm_meta_t
& get_local_cm_meta() { return local_cm_meta
; }
515 void add_rq_wr(Chunk
* chunk
)
519 std::lock_guard l
{lock
};
520 recv_queue
.push_back(chunk
);
523 void remove_rq_wr(Chunk
* chunk
) {
526 std::lock_guard l
{lock
};
527 auto it
= std::find(recv_queue
.begin(), recv_queue
.end(), chunk
);
528 ceph_assert(it
!= recv_queue
.end());
529 recv_queue
.erase(it
);
531 ibv_srq
* get_srq() const { return srq
; }
535 Infiniband
& infiniband
; // Infiniband to which this QP belongs
536 ibv_qp_type type
; // QP type (IBV_QPT_RC, etc.)
537 ibv_context
* ctxt
; // device context of the HCA to use
538 int ib_physical_port
;
539 ibv_pd
* pd
; // protection domain
540 ibv_srq
* srq
; // shared receive queue
541 ibv_qp
* qp
; // infiniband verbs QP handle
542 struct rdma_cm_id
*cm_id
;
543 ib_cm_meta_t peer_cm_meta
;
544 ib_cm_meta_t local_cm_meta
;
545 Infiniband::CompletionQueue
* txcq
;
546 Infiniband::CompletionQueue
* rxcq
;
547 uint32_t initial_psn
; // initial packet sequence number
548 uint32_t max_send_wr
;
549 uint32_t max_recv_wr
;
552 vector
<Chunk
*> recv_queue
;
553 ceph::mutex lock
= ceph::make_mutex("queue_pair_lock");
557 typedef MemoryManager::Cluster Cluster
;
558 typedef MemoryManager::Chunk Chunk
;
559 QueuePair
* create_queue_pair(CephContext
*c
, CompletionQueue
*, CompletionQueue
*,
560 ibv_qp_type type
, struct rdma_cm_id
*cm_id
);
561 ibv_srq
* create_shared_receive_queue(uint32_t max_wr
, uint32_t max_sge
);
562 // post rx buffers to srq, return number of buffers actually posted
563 int post_chunks_to_rq(int num
, QueuePair
*qp
= nullptr);
564 void post_chunk_to_pool(Chunk
* chunk
) {
565 QueuePair
*qp
= chunk
->get_qp();
567 qp
->remove_rq_wr(chunk
);
569 get_memory_manager()->release_rx_buffer(chunk
);
571 int get_tx_buffers(std::vector
<Chunk
*> &c
, size_t bytes
);
572 CompletionChannel
*create_comp_channel(CephContext
*c
);
573 CompletionQueue
*create_comp_queue(CephContext
*c
, CompletionChannel
*cc
=NULL
);
574 uint8_t get_ib_physical_port() { return ib_physical_port
; }
575 uint16_t get_lid() { return device
->get_lid(); }
576 ibv_gid
get_gid() { return device
->get_gid(); }
577 MemoryManager
* get_memory_manager() { return memory_manager
; }
578 Device
* get_device() { return device
; }
579 int get_async_fd() { return device
->ctxt
->async_fd
; }
580 bool is_tx_buffer(const char* c
) { return memory_manager
->is_tx_buffer(c
);}
581 Chunk
*get_tx_chunk_by_buffer(const char *c
) { return memory_manager
->get_tx_chunk_by_buffer(c
); }
582 static const char* wc_status_to_string(int status
);
583 static const char* qp_state_string(int status
);
584 uint32_t get_rx_queue_len() const { return rx_queue_len
; }