]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/Infiniband.h
7394ca92bad78c468a1eefab0abebcbfd0c576f5
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_INFINIBAND_H
18 #define CEPH_INFINIBAND_H
20 #include <infiniband/verbs.h>
25 #include "include/int_types.h"
26 #include "include/page.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/msg_types.h"
30 #include "msg/async/net_handler.h"
31 #include "common/Mutex.h"
39 #define HUGE_PAGE_SIZE (2 * 1024 * 1024)
40 #define ALIGN_TO_PAGE_SIZE(x) \
41 (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)
49 } __attribute__((packed
));
60 class ProtectionDomain
{
62 explicit ProtectionDomain(CephContext
*cct
, Device
*device
);
73 Chunk(ibv_mr
* m
, uint32_t len
, char* b
);
76 void set_offset(uint32_t o
);
77 uint32_t get_offset();
78 void set_bound(uint32_t b
);
79 void prepare_read(uint32_t b
);
81 uint32_t read(char* buf
, uint32_t len
);
82 uint32_t write(char* buf
, uint32_t len
);
97 Cluster(MemoryManager
& m
, uint32_t s
);
100 int fill(uint32_t num
);
101 void take_back(std::vector
<Chunk
*> &ck
);
102 int get_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
);
103 Chunk
*get_chunk_by_buffer(const char *c
) {
104 uint32_t idx
= (c
- base
) / buffer_size
;
105 Chunk
*chunk
= chunk_base
+ idx
;
108 bool is_my_buffer(const char *c
) const {
109 return c
>= base
&& c
< end
;
112 MemoryManager
& manager
;
113 uint32_t buffer_size
;
116 std::vector
<Chunk
*> free_chunks
;
117 char *base
= nullptr;
119 Chunk
* chunk_base
= nullptr;
122 MemoryManager(Device
*d
, ProtectionDomain
*p
, bool hugepage
);
125 void* malloc_huge_pages(size_t size
);
126 void free_huge_pages(void *ptr
);
127 void register_rx_tx(uint32_t size
, uint32_t rx_num
, uint32_t tx_num
);
128 void return_tx(std::vector
<Chunk
*> &chunks
);
129 int get_send_buffers(std::vector
<Chunk
*> &c
, size_t bytes
);
130 int get_channel_buffers(std::vector
<Chunk
*> &chunks
, size_t bytes
);
131 bool is_tx_buffer(const char* c
) { return send
->is_my_buffer(c
); }
132 bool is_rx_buffer(const char* c
) { return channel
->is_my_buffer(c
); }
133 Chunk
*get_tx_chunk_by_buffer(const char *c
) {
134 return send
->get_chunk_by_buffer(c
);
136 uint32_t get_tx_buffer_size() const {
137 return send
->buffer_size
;
140 bool enabled_huge_page
;
143 Cluster
* channel
;//RECV
144 Cluster
* send
;// SEND
146 ProtectionDomain
*pd
;
152 bool initialized
= false;
153 DeviceList
*device_list
= nullptr;
154 RDMADispatcher
*dispatcher
= nullptr;
157 explicit Infiniband(CephContext
*c
);
161 void set_dispatcher(RDMADispatcher
*d
);
163 class CompletionChannel
{
164 static const uint32_t MAX_ACK_EVENT
= 5000;
167 ibv_comp_channel
*channel
;
169 uint32_t cq_events_that_need_ack
;
172 CompletionChannel(CephContext
*c
, Device
&ibdev
);
173 ~CompletionChannel();
176 int get_fd() { return channel
->fd
; }
177 ibv_comp_channel
* get_channel() { return channel
; }
178 void bind_cq(ibv_cq
*c
) { cq
= c
; }
182 // this class encapsulates the creation, use, and destruction of an RC
185 // You need to call init and it will create a cq and associate to comp channel
186 class CompletionQueue
{
188 CompletionQueue(CephContext
*c
, Device
&ibdev
,
189 const uint32_t qd
, CompletionChannel
*cc
)
190 : cct(c
), ibdev(ibdev
), channel(cc
), cq(NULL
), queue_depth(qd
) {}
193 int poll_cq(int num_entries
, ibv_wc
*ret_wc_array
);
195 ibv_cq
* get_cq() const { return cq
; }
196 int rearm_notify(bool solicited_only
=true);
197 CompletionChannel
* get_cc() const { return channel
; }
201 CompletionChannel
*channel
;
203 uint32_t queue_depth
;
206 // this class encapsulates the creation, use, and destruction of an RC
209 // you need call init and it will create a qp and bring it to the INIT state.
210 // after obtaining the lid, qpn, and psn of a remote queue pair, one
211 // must call plumb() to bring the queue pair to the RTS state.
214 QueuePair(CephContext
*c
, Device
&device
, ibv_qp_type type
,
215 int ib_physical_port
, ibv_srq
*srq
,
216 Infiniband::CompletionQueue
* txcq
,
217 Infiniband::CompletionQueue
* rxcq
,
218 uint32_t max_send_wr
, uint32_t max_recv_wr
, uint32_t q_key
= 0);
224 * Get the initial packet sequence number for this QueuePair.
225 * This is randomly generated on creation. It should not be confused
226 * with the remote side's PSN, which is set in #plumb().
228 uint32_t get_initial_psn() const { return initial_psn
; };
230 * Get the local queue pair number for this QueuePair.
231 * QPNs are analogous to UDP/TCP port numbers.
233 uint32_t get_local_qp_number() const { return qp
->qp_num
; };
235 * Get the remote queue pair number for this QueuePair, as set in #plumb().
236 * QPNs are analogous to UDP/TCP port numbers.
238 int get_remote_qp_number(uint32_t *rqp
) const;
240 * Get the remote infiniband address for this QueuePair, as set in #plumb().
241 * LIDs are "local IDs" in infiniband terminology. They are short, locally
242 * routable addresses.
244 int get_remote_lid(uint16_t *lid
) const;
246 * Get the state of a QueuePair.
248 int get_state() const;
250 * Return true if the queue pair is in an error state, false otherwise.
252 bool is_error() const;
253 ibv_qp
* get_qp() const { return qp
; }
254 Infiniband::CompletionQueue
* get_tx_cq() const { return txcq
; }
255 Infiniband::CompletionQueue
* get_rx_cq() const { return rxcq
; }
257 bool is_dead() const { return dead
; }
261 Device
&ibdev
; // Infiniband to which this QP belongs
262 ibv_qp_type type
; // QP type (IBV_QPT_RC, etc.)
263 ibv_context
* ctxt
; // device context of the HCA to use
264 int ib_physical_port
;
265 ibv_pd
* pd
; // protection domain
266 ibv_srq
* srq
; // shared receive queue
267 ibv_qp
* qp
; // infiniband verbs QP handle
268 Infiniband::CompletionQueue
* txcq
;
269 Infiniband::CompletionQueue
* rxcq
;
270 uint32_t initial_psn
; // initial packet sequence number
271 uint32_t max_send_wr
;
272 uint32_t max_recv_wr
;
278 static const char* wc_status_to_string(int status
);
279 static const char* qp_state_string(int status
);
281 void handle_pre_fork();
283 Device
* get_device(const char* device_name
);
284 Device
* get_device(const struct ibv_context
*ctxt
);
286 int poll_tx(int n
, Device
**d
, ibv_wc
*wc
);
287 int poll_rx(int n
, Device
**d
, ibv_wc
*wc
);
288 int poll_blocking(bool &done
);
290 void handle_async_event();
291 RDMADispatcher
*get_dispatcher() { return dispatcher
; }
294 inline ostream
& operator<<(ostream
& out
, const Infiniband::QueuePair
&qp
)
296 return out
<< qp
.get_local_qp_number();