]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/Infiniband.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / async / rdma / Infiniband.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include "Infiniband.h"
18 #include "RDMAStack.h"
19 #include "Device.h"
20
21 #include "common/errno.h"
22 #include "common/debug.h"
23
24 #define dout_subsys ceph_subsys_ms
25 #undef dout_prefix
26 #define dout_prefix *_dout << "Infiniband "
27
28 static const uint32_t MAX_INLINE_DATA = 0;
29
30 Infiniband::QueuePair::QueuePair(
31 CephContext *c, Device &device, ibv_qp_type type,
32 int port, ibv_srq *srq,
33 Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
34 uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
35 : cct(c), ibdev(device),
36 type(type),
37 ctxt(ibdev.ctxt),
38 ib_physical_port(port),
39 pd(ibdev.pd->pd),
40 srq(srq),
41 qp(NULL),
42 txcq(txcq),
43 rxcq(rxcq),
44 initial_psn(0),
45 max_send_wr(max_send_wr),
46 max_recv_wr(max_recv_wr),
47 q_key(q_key),
48 dead(false)
49 {
50 initial_psn = lrand48() & 0xffffff;
51 if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
52 lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
53 ceph_abort();
54 }
55 }
56
57 int Infiniband::QueuePair::init()
58 {
59 ldout(cct, 20) << __func__ << " started." << dendl;
60 ibv_qp_init_attr qpia;
61 memset(&qpia, 0, sizeof(qpia));
62 qpia.send_cq = txcq->get_cq();
63 qpia.recv_cq = rxcq->get_cq();
64 qpia.srq = srq; // use the same shared receive queue
65 qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
66 qpia.cap.max_send_sge = 1; // max send scatter-gather elements
67 qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
68 qpia.qp_type = type; // RC, UC, UD, or XRC
69 qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
70
71 qp = ibv_create_qp(pd, &qpia);
72 if (qp == NULL) {
73 lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
74 if (errno == ENOMEM) {
75 lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
76 " ms_async_rdma_send_buffers or"
77 " ms_async_rdma_buffer_size" << dendl;
78 }
79 return -1;
80 }
81
82 ldout(cct, 20) << __func__ << " successfully create queue pair: "
83 << "qp=" << qp << dendl;
84
85 // move from RESET to INIT state
86 ibv_qp_attr qpa;
87 memset(&qpa, 0, sizeof(qpa));
88 qpa.qp_state = IBV_QPS_INIT;
89 qpa.pkey_index = 0;
90 qpa.port_num = (uint8_t)(ib_physical_port);
91 qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
92 qpa.qkey = q_key;
93
94 int mask = IBV_QP_STATE | IBV_QP_PORT;
95 switch (type) {
96 case IBV_QPT_RC:
97 mask |= IBV_QP_ACCESS_FLAGS;
98 mask |= IBV_QP_PKEY_INDEX;
99 break;
100 case IBV_QPT_UD:
101 mask |= IBV_QP_QKEY;
102 mask |= IBV_QP_PKEY_INDEX;
103 break;
104 case IBV_QPT_RAW_PACKET:
105 break;
106 default:
107 ceph_abort();
108 }
109
110 int ret = ibv_modify_qp(qp, &qpa, mask);
111 if (ret) {
112 ibv_destroy_qp(qp);
113 lderr(cct) << __func__ << " failed to transition to INIT state: "
114 << cpp_strerror(errno) << dendl;
115 return -1;
116 }
117 ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
118 << " qp=" << qp << dendl;
119 return 0;
120 }
121
122 /**
123 * Change RC QueuePair into the ERROR state. This is necessary modify
124 * the Queue Pair into the Error state and poll all of the relevant
125 * Work Completions prior to destroying a Queue Pair.
126 * Since destroying a Queue Pair does not guarantee that its Work
127 * Completions are removed from the CQ upon destruction. Even if the
128 * Work Completions are already in the CQ, it might not be possible to
129 * retrieve them. If the Queue Pair is associated with an SRQ, it is
130 * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
131 *
132 * \return
133 * -errno if the QueuePair can't switch to ERROR
134 * 0 for success.
135 */
136 int Infiniband::QueuePair::to_dead()
137 {
138 if (dead)
139 return 0;
140 ibv_qp_attr qpa;
141 memset(&qpa, 0, sizeof(qpa));
142 qpa.qp_state = IBV_QPS_ERR;
143
144 int mask = IBV_QP_STATE;
145 int ret = ibv_modify_qp(qp, &qpa, mask);
146 if (ret) {
147 lderr(cct) << __func__ << " failed to transition to ERROR state: "
148 << cpp_strerror(errno) << dendl;
149 return -errno;
150 }
151 dead = true;
152 return ret;
153 }
154
155 int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
156 {
157 ibv_qp_attr qpa;
158 ibv_qp_init_attr qpia;
159
160 int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
161 if (r) {
162 lderr(cct) << __func__ << " failed to query qp: "
163 << cpp_strerror(errno) << dendl;
164 return -1;
165 }
166
167 if (rqp)
168 *rqp = qpa.dest_qp_num;
169 return 0;
170 }
171
172 /**
173 * Get the remote infiniband address for this QueuePair, as set in #plumb().
174 * LIDs are "local IDs" in infiniband terminology. They are short, locally
175 * routable addresses.
176 */
177 int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
178 {
179 ibv_qp_attr qpa;
180 ibv_qp_init_attr qpia;
181
182 int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
183 if (r) {
184 lderr(cct) << __func__ << " failed to query qp: "
185 << cpp_strerror(errno) << dendl;
186 return -1;
187 }
188
189 if (lid)
190 *lid = qpa.ah_attr.dlid;
191 return 0;
192 }
193
194 /**
195 * Get the state of a QueuePair.
196 */
197 int Infiniband::QueuePair::get_state() const
198 {
199 ibv_qp_attr qpa;
200 ibv_qp_init_attr qpia;
201
202 int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
203 if (r) {
204 lderr(cct) << __func__ << " failed to get state: "
205 << cpp_strerror(errno) << dendl;
206 return -1;
207 }
208 return qpa.qp_state;
209 }
210
211 /**
212 * Return true if the queue pair is in an error state, false otherwise.
213 */
214 bool Infiniband::QueuePair::is_error() const
215 {
216 ibv_qp_attr qpa;
217 ibv_qp_init_attr qpia;
218
219 int r = ibv_query_qp(qp, &qpa, -1, &qpia);
220 if (r) {
221 lderr(cct) << __func__ << " failed to get state: "
222 << cpp_strerror(errno) << dendl;
223 return true;
224 }
225 return qpa.cur_qp_state == IBV_QPS_ERR;
226 }
227
228
229 Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Device &ibdev)
230 : cct(c), ibdev(ibdev), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
231 {
232 }
233
234 Infiniband::CompletionChannel::~CompletionChannel()
235 {
236 if (channel) {
237 int r = ibv_destroy_comp_channel(channel);
238 if (r < 0)
239 lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
240 assert(r == 0);
241 }
242 }
243
244 int Infiniband::CompletionChannel::init()
245 {
246 ldout(cct, 20) << __func__ << " started." << dendl;
247 channel = ibv_create_comp_channel(ibdev.ctxt);
248 if (!channel) {
249 lderr(cct) << __func__ << " failed to create receive completion channel: "
250 << cpp_strerror(errno) << dendl;
251 return -1;
252 }
253 int rc = NetHandler(cct).set_nonblock(channel->fd);
254 if (rc < 0) {
255 ibv_destroy_comp_channel(channel);
256 return -1;
257 }
258 return 0;
259 }
260
261 void Infiniband::CompletionChannel::ack_events()
262 {
263 ibv_ack_cq_events(cq, cq_events_that_need_ack);
264 cq_events_that_need_ack = 0;
265 }
266
267 bool Infiniband::CompletionChannel::get_cq_event()
268 {
269 ibv_cq *cq = NULL;
270 void *ev_ctx;
271 if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
272 if (errno != EAGAIN && errno != EINTR)
273 lderr(cct) << __func__ << " failed to retrieve CQ event: "
274 << cpp_strerror(errno) << dendl;
275 return false;
276 }
277
278 /* accumulate number of cq events that need to
279 * * be acked, and periodically ack them
280 * */
281 if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
282 ldout(cct, 20) << __func__ << " ack aq events." << dendl;
283 ibv_ack_cq_events(cq, MAX_ACK_EVENT);
284 cq_events_that_need_ack = 0;
285 }
286
287 return true;
288 }
289
290
291 Infiniband::CompletionQueue::~CompletionQueue()
292 {
293 if (cq) {
294 int r = ibv_destroy_cq(cq);
295 if (r < 0)
296 lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
297 assert(r == 0);
298 }
299 }
300
301 int Infiniband::CompletionQueue::init()
302 {
303 cq = ibv_create_cq(ibdev.ctxt, queue_depth, this, channel->get_channel(), 0);
304 if (!cq) {
305 lderr(cct) << __func__ << " failed to create receive completion queue: "
306 << cpp_strerror(errno) << dendl;
307 return -1;
308 }
309
310 if (ibv_req_notify_cq(cq, 0)) {
311 lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
312 ibv_destroy_cq(cq);
313 cq = nullptr;
314 return -1;
315 }
316
317 channel->bind_cq(cq);
318 ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
319 return 0;
320 }
321
322 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
323 {
324 ldout(cct, 20) << __func__ << " started." << dendl;
325 int r = ibv_req_notify_cq(cq, 0);
326 if (r < 0)
327 lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
328 return r;
329 }
330
331 int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
332 int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
333 if (r < 0) {
334 lderr(cct) << __func__ << " poll_completion_queue occur met error: "
335 << cpp_strerror(errno) << dendl;
336 return -1;
337 }
338 return r;
339 }
340
341
342 Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
343 : pd(ibv_alloc_pd(device->ctxt))
344 {
345 if (pd == NULL) {
346 lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
347 ceph_abort();
348 }
349 }
350
351 Infiniband::ProtectionDomain::~ProtectionDomain()
352 {
353 int rc = ibv_dealloc_pd(pd);
354 assert(rc == 0);
355 }
356
357
358 Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t len, char* b)
359 : mr(m), bytes(len), offset(0), buffer(b)
360 {
361 }
362
363 Infiniband::MemoryManager::Chunk::~Chunk()
364 {
365 assert(ibv_dereg_mr(mr) == 0);
366 }
367
368 void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o)
369 {
370 offset = o;
371 }
372
373 uint32_t Infiniband::MemoryManager::Chunk::get_offset()
374 {
375 return offset;
376 }
377
378 void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b)
379 {
380 bound = b;
381 }
382
383 void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
384 {
385 offset = 0;
386 bound = b;
387 }
388
389 uint32_t Infiniband::MemoryManager::Chunk::get_bound()
390 {
391 return bound;
392 }
393
394 uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
395 {
396 uint32_t left = bound - offset;
397 if (left >= len) {
398 memcpy(buf, buffer+offset, len);
399 offset += len;
400 return len;
401 } else {
402 memcpy(buf, buffer+offset, left);
403 offset = 0;
404 bound = 0;
405 return left;
406 }
407 }
408
409 uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
410 {
411 uint32_t left = bytes - offset;
412 if (left >= len) {
413 memcpy(buffer+offset, buf, len);
414 offset += len;
415 return len;
416 } else {
417 memcpy(buffer+offset, buf, left);
418 offset = bytes;
419 return left;
420 }
421 }
422
423 bool Infiniband::MemoryManager::Chunk::full()
424 {
425 return offset == bytes;
426 }
427
428 bool Infiniband::MemoryManager::Chunk::over()
429 {
430 return Infiniband::MemoryManager::Chunk::offset == bound;
431 }
432
433 void Infiniband::MemoryManager::Chunk::clear()
434 {
435 offset = 0;
436 bound = 0;
437 }
438
439 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
440 : manager(m), buffer_size(s), lock("cluster_lock")
441 {
442 }
443
444 Infiniband::MemoryManager::Cluster::~Cluster()
445 {
446 const auto chunk_end = chunk_base + num_chunk;
447 for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
448 chunk->~Chunk();
449 }
450
451 ::free(chunk_base);
452 if (manager.enabled_huge_page)
453 manager.free_huge_pages(base);
454 else
455 ::free(base);
456 }
457
458 int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
459 {
460 assert(!base);
461 num_chunk = num;
462 uint32_t bytes = buffer_size * num;
463 if (manager.enabled_huge_page) {
464 base = (char*)manager.malloc_huge_pages(bytes);
465 } else {
466 base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
467 }
468 end = base + bytes;
469 assert(base);
470 chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
471 memset(chunk_base, 0, sizeof(Chunk) * num);
472 free_chunks.reserve(num);
473 Chunk* chunk = chunk_base;
474 for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
475 ibv_mr* m = ibv_reg_mr(manager.pd->pd, base+offset, buffer_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
476 assert(m);
477 new(chunk) Chunk(m, buffer_size, base+offset);
478 free_chunks.push_back(chunk);
479 chunk++;
480 }
481 return 0;
482 }
483
484 void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
485 {
486 Mutex::Locker l(lock);
487 for (auto c : ck) {
488 c->clear();
489 free_chunks.push_back(c);
490 }
491 }
492
493 int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes)
494 {
495 uint32_t num = bytes / buffer_size + 1;
496 if (bytes % buffer_size == 0)
497 --num;
498 int r = num;
499 Mutex::Locker l(lock);
500 if (free_chunks.empty())
501 return 0;
502 if (!bytes) {
503 r = free_chunks.size();
504 for (auto c : free_chunks)
505 chunks.push_back(c);
506 free_chunks.clear();
507 return r;
508 }
509 if (free_chunks.size() < num) {
510 num = free_chunks.size();
511 r = num;
512 }
513 for (uint32_t i = 0; i < num; ++i) {
514 chunks.push_back(free_chunks.back());
515 free_chunks.pop_back();
516 }
517 return r;
518 }
519
520
521 Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
522 : device(d), pd(p)
523 {
524 enabled_huge_page = hugepage;
525 }
526
527 Infiniband::MemoryManager::~MemoryManager()
528 {
529 if (channel)
530 delete channel;
531 if (send)
532 delete send;
533 }
534
535 void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
536 {
537 size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
538 char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
539 if (ptr == MAP_FAILED) {
540 ptr = (char *)malloc(real_size);
541 if (ptr == NULL) return NULL;
542 real_size = 0;
543 }
544 *((size_t *)ptr) = real_size;
545 return ptr + HUGE_PAGE_SIZE;
546 }
547
548 void Infiniband::MemoryManager::free_huge_pages(void *ptr)
549 {
550 if (ptr == NULL) return;
551 void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
552 size_t real_size = *((size_t *)real_ptr);
553 assert(real_size % HUGE_PAGE_SIZE == 0);
554 if (real_size != 0)
555 munmap(real_ptr, real_size);
556 else
557 free(real_ptr);
558 }
559
560 void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
561 {
562 assert(device);
563 assert(pd);
564 channel = new Cluster(*this, size);
565 channel->fill(rx_num);
566
567 send = new Cluster(*this, size);
568 send->fill(tx_num);
569 }
570
571 void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
572 {
573 send->take_back(chunks);
574 }
575
576 int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
577 {
578 return send->get_buffers(c, bytes);
579 }
580
581 int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
582 {
583 return channel->get_buffers(chunks, bytes);
584 }
585
586
587 Infiniband::Infiniband(CephContext *cct)
588 : cct(cct), lock("IB lock")
589 {
590 }
591
592 Infiniband::~Infiniband()
593 {
594 if (dispatcher)
595 dispatcher->polling_stop();
596
597 delete device_list;
598 }
599
600 void Infiniband::init()
601 {
602 Mutex::Locker l(lock);
603
604 if (initialized)
605 return;
606
607 device_list = new DeviceList(cct, this);
608 initialized = true;
609
610 dispatcher->polling_start();
611 }
612
613 void Infiniband::set_dispatcher(RDMADispatcher *d)
614 {
615 assert(!d ^ !dispatcher);
616
617 dispatcher = d;
618 }
619
620 Device* Infiniband::get_device(const char* device_name)
621 {
622 return device_list->get_device(device_name);
623 }
624
625 Device *Infiniband::get_device(const struct ibv_context *ctxt)
626 {
627 return device_list->get_device(ctxt);
628 }
629
630 Infiniband::QueuePair::~QueuePair()
631 {
632 if (qp) {
633 ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
634 assert(!ibv_destroy_qp(qp));
635 }
636 }
637
638 /**
639 * Given a string representation of the `status' field from Verbs
640 * struct `ibv_wc'.
641 *
642 * \param[in] status
643 * The integer status obtained in ibv_wc.status.
644 * \return
645 * A string corresponding to the given status.
646 */
647 const char* Infiniband::wc_status_to_string(int status)
648 {
649 static const char *lookup[] = {
650 "SUCCESS",
651 "LOC_LEN_ERR",
652 "LOC_QP_OP_ERR",
653 "LOC_EEC_OP_ERR",
654 "LOC_PROT_ERR",
655 "WR_FLUSH_ERR",
656 "MW_BIND_ERR",
657 "BAD_RESP_ERR",
658 "LOC_ACCESS_ERR",
659 "REM_INV_REQ_ERR",
660 "REM_ACCESS_ERR",
661 "REM_OP_ERR",
662 "RETRY_EXC_ERR",
663 "RNR_RETRY_EXC_ERR",
664 "LOC_RDD_VIOL_ERR",
665 "REM_INV_RD_REQ_ERR",
666 "REM_ABORT_ERR",
667 "INV_EECN_ERR",
668 "INV_EEC_STATE_ERR",
669 "FATAL_ERR",
670 "RESP_TIMEOUT_ERR",
671 "GENERAL_ERR"
672 };
673
674 if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
675 return "<status out of range!>";
676 return lookup[status];
677 }
678
679 const char* Infiniband::qp_state_string(int status) {
680 switch(status) {
681 case IBV_QPS_RESET : return "IBV_QPS_RESET";
682 case IBV_QPS_INIT : return "IBV_QPS_INIT";
683 case IBV_QPS_RTR : return "IBV_QPS_RTR";
684 case IBV_QPS_RTS : return "IBV_QPS_RTS";
685 case IBV_QPS_SQD : return "IBV_QPS_SQD";
686 case IBV_QPS_SQE : return "IBV_QPS_SQE";
687 case IBV_QPS_ERR : return "IBV_QPS_ERR";
688 default: return " out of range.";
689 }
690 }
691
692 void Infiniband::handle_pre_fork()
693 {
694 device_list->uninit();
695 }
696
697 int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
698 {
699 return device_list->poll_tx(n, d, wc);
700 }
701
702 int Infiniband::poll_rx(int n, Device **d, ibv_wc *wc)
703 {
704 return device_list->poll_rx(n, d, wc);
705 }
706
707 int Infiniband::poll_blocking(bool &done)
708 {
709 return device_list->poll_blocking(done);
710 }
711
712 void Infiniband::rearm_notify()
713 {
714 device_list->rearm_notify();
715 }
716
717 void Infiniband::handle_async_event()
718 {
719 device_list->handle_async_event();
720 }