]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "Infiniband.h" | |
18 | #include "RDMAStack.h" | |
19 | #include "Device.h" | |
20 | #include "common/errno.h" | |
21 | #include "common/debug.h" | |
22 | ||
23 | #include <poll.h> | |
24 | ||
25 | #define dout_subsys ceph_subsys_ms | |
26 | #undef dout_prefix | |
27 | #define dout_prefix *_dout << "IBDevice " | |
28 | ||
29 | static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; | |
30 | static const uint32_t CQ_DEPTH = 30000; | |
31 | ||
32 | Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr) | |
33 | { | |
34 | #ifdef HAVE_IBV_EXP | |
35 | union ibv_gid cgid; | |
36 | struct ibv_exp_gid_attr gid_attr; | |
37 | bool malformed = false; | |
38 | ||
39 | ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl; | |
40 | int r = ibv_query_port(ctxt, port_num, port_attr); | |
41 | if (r == -1) { | |
42 | lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; | |
43 | ceph_abort(); | |
44 | } | |
45 | ||
46 | lid = port_attr->lid; | |
47 | ||
48 | // search for requested GID in GIDs table | |
49 | ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) | |
50 | << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; | |
51 | r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), | |
52 | "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" | |
53 | ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", | |
54 | &cgid.raw[ 0], &cgid.raw[ 1], | |
55 | &cgid.raw[ 2], &cgid.raw[ 3], | |
56 | &cgid.raw[ 4], &cgid.raw[ 5], | |
57 | &cgid.raw[ 6], &cgid.raw[ 7], | |
58 | &cgid.raw[ 8], &cgid.raw[ 9], | |
59 | &cgid.raw[10], &cgid.raw[11], | |
60 | &cgid.raw[12], &cgid.raw[13], | |
61 | &cgid.raw[14], &cgid.raw[15]); | |
62 | ||
63 | if (r != 16) { | |
64 | ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; | |
65 | malformed = true; | |
66 | } | |
67 | ||
68 | gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; | |
69 | ||
70 | for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) { | |
71 | r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); | |
72 | if (r) { | |
73 | lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; | |
74 | ceph_abort(); | |
75 | } | |
76 | r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); | |
77 | if (r) { | |
78 | lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; | |
79 | ceph_abort(); | |
80 | } | |
81 | ||
82 | if (malformed) break; // stay with gid_idx=0 | |
83 | if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && | |
84 | (memcmp(&gid, &cgid, 16) == 0) ) { | |
85 | ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; | |
86 | break; | |
87 | } | |
88 | } | |
89 | ||
90 | if (gid_idx == port_attr->gid_tbl_len) { | |
91 | lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; | |
92 | ceph_abort(); | |
93 | } | |
94 | #else | |
95 | int r = ibv_query_port(ctxt, port_num, port_attr); | |
96 | if (r == -1) { | |
97 | lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; | |
98 | ceph_abort(); | |
99 | } | |
100 | ||
101 | lid = port_attr->lid; | |
102 | r = ibv_query_gid(ctxt, port_num, 0, &gid); | |
103 | if (r) { | |
104 | lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl; | |
105 | ceph_abort(); | |
106 | } | |
107 | #endif | |
108 | } | |
109 | ||
110 | Port::~Port() | |
111 | { | |
112 | delete port_attr; | |
113 | } | |
114 | ||
115 | ||
116 | Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d) | |
117 | : cct(cct), device(d), lock("ibdev_lock"), | |
118 | async_handler(new C_handle_cq_async(this)), infiniband(ib), | |
119 | device_attr(new ibv_device_attr) | |
120 | { | |
121 | if (device == NULL) { | |
122 | lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl; | |
123 | ceph_abort(); | |
124 | } | |
125 | name = ibv_get_device_name(device); | |
126 | ctxt = ibv_open_device(device); | |
127 | if (ctxt == NULL) { | |
128 | lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl; | |
129 | ceph_abort(); | |
130 | } | |
131 | int r = ibv_query_device(ctxt, device_attr); | |
132 | if (r == -1) { | |
133 | lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; | |
134 | ceph_abort(); | |
135 | } | |
136 | ||
137 | port_cnt = device_attr->phys_port_cnt; | |
138 | ports = new Port *[port_cnt + 1]; | |
139 | assert(ports); | |
140 | ||
141 | for (int i = 1; i <= port_cnt; i++) { | |
142 | ports[i] = new Port(cct, ctxt, i); | |
143 | assert(ports[i]); | |
144 | } | |
145 | ||
146 | tx_cc = create_comp_channel(cct); | |
147 | assert(tx_cc); | |
148 | ||
149 | rx_cc = create_comp_channel(cct); | |
150 | assert(rx_cc); | |
151 | ||
152 | assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0); | |
153 | } | |
154 | ||
155 | void Device::init(int ibport) | |
156 | { | |
157 | Mutex::Locker l(lock); | |
158 | ||
159 | verify_port(ibport); | |
160 | ||
161 | if (initialized) | |
162 | return; | |
163 | ||
164 | pd = new ProtectionDomain(cct, this); | |
165 | ||
166 | max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers); | |
167 | ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl; | |
168 | ||
169 | max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers); | |
170 | ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl; | |
171 | ||
172 | ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe | |
173 | << " completion entries" << dendl; | |
174 | ||
175 | memory_manager = new MemoryManager(this, pd, | |
176 | cct->_conf->ms_async_rdma_enable_hugepage); | |
177 | memory_manager->register_rx_tx( | |
178 | cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr); | |
179 | ||
180 | srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT); | |
181 | post_channel_cluster(); | |
182 | ||
183 | tx_cq = create_comp_queue(cct, tx_cc); | |
184 | assert(tx_cq); | |
185 | ||
186 | rx_cq = create_comp_queue(cct, rx_cc); | |
187 | assert(rx_cq); | |
188 | ||
189 | initialized = true; | |
190 | ||
191 | ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl; | |
192 | } | |
193 | ||
194 | void Device::uninit() | |
195 | { | |
196 | Mutex::Locker l(lock); | |
197 | ||
198 | if (!initialized) | |
199 | return; | |
200 | ||
201 | tx_cc->ack_events(); | |
202 | rx_cc->ack_events(); | |
203 | ||
204 | initialized = false; | |
205 | ||
206 | delete rx_cq; | |
207 | delete tx_cq; | |
208 | delete rx_cc; | |
209 | delete tx_cc; | |
210 | ||
211 | assert(ibv_destroy_srq(srq) == 0); | |
212 | delete memory_manager; | |
213 | delete pd; | |
214 | } | |
215 | ||
216 | Device::~Device() | |
217 | { | |
218 | delete async_handler; | |
219 | ||
220 | uninit(); | |
221 | ||
222 | for (int i = 1; i <= port_cnt; i++) | |
223 | delete ports[i]; | |
224 | delete[] ports; | |
225 | ||
226 | assert(ibv_close_device(ctxt) == 0); | |
227 | delete device_attr; | |
228 | } | |
229 | ||
230 | void Device::verify_port(int port_num) { | |
231 | if (port_num < 0 || port_num > port_cnt) { | |
232 | lderr(cct) << __func__ << " port not found" << dendl; | |
233 | ceph_abort(); | |
234 | } | |
235 | ||
236 | Port *port = ports[port_num]; | |
237 | ||
238 | if (port->get_port_attr()->state == IBV_PORT_ACTIVE) { | |
239 | ldout(cct, 1) << __func__ << " found active port " << port_num << dendl; | |
240 | } else { | |
241 | ldout(cct, 10) << __func__ << " port " << port_num << | |
242 | " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl; | |
243 | ceph_abort(); | |
244 | } | |
245 | } | |
246 | ||
247 | Port *Device::get_port(int ibport) | |
248 | { | |
249 | assert(ibport > 0 && ibport <= port_cnt); | |
250 | return ports[ibport]; | |
251 | } | |
252 | ||
253 | /** | |
254 | * Create a new QueuePair. This factory should be used in preference to | |
255 | * the QueuePair constructor directly, since this lets derivatives of | |
256 | * Infiniband, e.g. MockInfiniband (if it existed), | |
257 | * return mocked out QueuePair derivatives. | |
258 | * | |
259 | * \return | |
260 | * QueuePair on success or NULL if init fails | |
261 | * See QueuePair::QueuePair for parameter documentation. | |
262 | */ | |
263 | Infiniband::QueuePair* Device::create_queue_pair(int port, | |
264 | ibv_qp_type type) | |
265 | { | |
266 | Infiniband::QueuePair *qp = new QueuePair( | |
267 | cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr); | |
268 | if (qp->init()) { | |
269 | delete qp; | |
270 | return NULL; | |
271 | } | |
272 | return qp; | |
273 | } | |
274 | ||
275 | /** | |
276 | * Create a shared receive queue. This basically wraps the verbs call. | |
277 | * | |
278 | * \param[in] max_wr | |
279 | * The max number of outstanding work requests in the SRQ. | |
280 | * \param[in] max_sge | |
281 | * The max number of scatter elements per WR. | |
282 | * \return | |
283 | * A valid ibv_srq pointer, or NULL on error. | |
284 | */ | |
285 | ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) | |
286 | { | |
287 | ibv_srq_init_attr sia; | |
288 | memset(&sia, 0, sizeof(sia)); | |
289 | sia.srq_context = ctxt; | |
290 | sia.attr.max_wr = max_wr; | |
291 | sia.attr.max_sge = max_sge; | |
292 | return ibv_create_srq(pd->pd, &sia); | |
293 | } | |
294 | ||
295 | Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c) | |
296 | { | |
297 | Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); | |
298 | if (cc->init()) { | |
299 | delete cc; | |
300 | return NULL; | |
301 | } | |
302 | return cc; | |
303 | } | |
304 | ||
305 | Infiniband::CompletionQueue* Device::create_comp_queue( | |
306 | CephContext *cct, CompletionChannel *cc) | |
307 | { | |
308 | Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( | |
309 | cct, *this, CQ_DEPTH, cc); | |
310 | if (cq->init()) { | |
311 | delete cq; | |
312 | return NULL; | |
313 | } | |
314 | return cq; | |
315 | } | |
316 | ||
317 | int Device::post_chunk(Chunk* chunk) | |
318 | { | |
319 | ibv_sge isge; | |
320 | isge.addr = reinterpret_cast<uint64_t>(chunk->buffer); | |
321 | isge.length = chunk->bytes; | |
322 | isge.lkey = chunk->mr->lkey; | |
323 | ibv_recv_wr rx_work_request; | |
324 | ||
325 | memset(&rx_work_request, 0, sizeof(rx_work_request)); | |
326 | rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr | |
327 | rx_work_request.next = NULL; | |
328 | rx_work_request.sg_list = &isge; | |
329 | rx_work_request.num_sge = 1; | |
330 | ||
331 | ibv_recv_wr *badWorkRequest; | |
332 | int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest); | |
333 | if (ret) | |
334 | return -errno; | |
335 | return 0; | |
336 | } | |
337 | ||
338 | int Device::post_channel_cluster() | |
339 | { | |
340 | vector<Chunk*> free_chunks; | |
341 | int r = memory_manager->get_channel_buffers(free_chunks, 0); | |
342 | assert(r > 0); | |
343 | for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) { | |
344 | r = post_chunk(*iter); | |
345 | assert(r == 0); | |
346 | } | |
347 | return 0; | |
348 | } | |
349 | ||
350 | int Device::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) | |
351 | { | |
352 | return memory_manager->get_send_buffers(c, bytes); | |
353 | } | |
354 | ||
355 | int Device::poll_tx_cq(int n, ibv_wc *wc) | |
356 | { | |
357 | if (!initialized) | |
358 | return 0; | |
359 | ||
360 | return tx_cq->poll_cq(n, wc); | |
361 | } | |
362 | ||
363 | int Device::poll_rx_cq(int n, ibv_wc *wc) | |
364 | { | |
365 | if (!initialized) | |
366 | return 0; | |
367 | ||
368 | return rx_cq->poll_cq(n, wc); | |
369 | } | |
370 | ||
371 | void Device::rearm_cqs() | |
372 | { | |
373 | int ret; | |
374 | ||
375 | if (!initialized) | |
376 | return; | |
377 | ||
378 | ret = tx_cq->rearm_notify(); | |
379 | assert(!ret); | |
380 | ||
381 | ret = rx_cq->rearm_notify(); | |
382 | assert(!ret); | |
383 | } | |
384 | ||
385 | void Device::handle_async_event() | |
386 | { | |
387 | ibv_async_event async_event; | |
388 | ||
389 | ldout(cct, 30) << __func__ << dendl; | |
390 | ||
391 | while (!ibv_get_async_event(ctxt, &async_event)) { | |
392 | RDMADispatcher *d = infiniband->get_dispatcher(); | |
393 | d->process_async_event(this, async_event); | |
394 | ||
395 | ibv_ack_async_event(&async_event); | |
396 | } | |
397 | ||
398 | if (errno != EAGAIN) { | |
399 | lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno | |
400 | << " " << cpp_strerror(errno) << ")" << dendl; | |
401 | } | |
402 | } | |
403 | ||
404 | ||
405 | DeviceList::DeviceList(CephContext *cct, Infiniband *ib) | |
406 | : cct(cct), device_list(ibv_get_device_list(&num)) | |
407 | { | |
408 | if (device_list == NULL || num == 0) { | |
409 | lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl; | |
410 | ceph_abort(); | |
411 | } | |
412 | devices = new Device*[num]; | |
413 | ||
414 | poll_fds = new struct pollfd[3 * num]; | |
415 | ||
416 | for (int i = 0; i < num; ++i) { | |
417 | struct pollfd *pfd = &poll_fds[i * 3]; | |
418 | struct Device *d; | |
419 | ||
420 | d = new Device(cct, ib, device_list[i]); | |
421 | devices[i] = d; | |
422 | ||
423 | pfd[0].fd = d->tx_cc->get_fd(); | |
424 | pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
425 | pfd[0].revents = 0; | |
426 | ||
427 | pfd[1].fd = d->rx_cc->get_fd(); | |
428 | pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
429 | pfd[1].revents = 0; | |
430 | ||
431 | pfd[2].fd = d->ctxt->async_fd; | |
432 | pfd[2].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
433 | pfd[2].revents = 0; | |
434 | } | |
435 | } | |
436 | ||
437 | DeviceList::~DeviceList() | |
438 | { | |
439 | delete[] poll_fds; | |
440 | ||
441 | for (int i=0; i < num; ++i) { | |
442 | delete devices[i]; | |
443 | } | |
444 | delete []devices; | |
445 | ibv_free_device_list(device_list); | |
446 | } | |
447 | ||
448 | Device* DeviceList::get_device(const char* device_name) | |
449 | { | |
450 | assert(devices); | |
451 | for (int i = 0; i < num; ++i) { | |
452 | if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) { | |
453 | return devices[i]; | |
454 | } | |
455 | } | |
456 | return NULL; | |
457 | } | |
458 | ||
459 | ||
460 | Device* DeviceList::get_device(const struct ibv_context *ctxt) | |
461 | { | |
462 | ibv_device *device = ctxt->device; | |
463 | ||
464 | assert(devices); | |
465 | for (int i = 0; i < num; ++i) { | |
466 | if (devices[i]->ctxt->device == device) { | |
467 | return devices[i]; | |
468 | } | |
469 | } | |
470 | ||
471 | return NULL; | |
472 | } | |
473 | ||
474 | int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc) | |
475 | { | |
476 | int n = 0; | |
477 | ||
478 | for (int i = 0; i < num; i++) { | |
479 | *d = devices[++last_poll_dev % num]; | |
480 | ||
481 | n = (*d)->poll_tx_cq(num_entries, wc); | |
482 | if (n) | |
483 | break; | |
484 | } | |
485 | ||
486 | return n; | |
487 | } | |
488 | ||
489 | int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc) | |
490 | { | |
491 | int n = 0; | |
492 | ||
493 | for (int i = 0; i < num; i++) { | |
494 | *d = devices[++last_poll_dev % num]; | |
495 | ||
496 | n = (*d)->poll_rx_cq(num_entries, wc); | |
497 | if (n) | |
498 | break; | |
499 | } | |
500 | ||
501 | return n; | |
502 | } | |
503 | ||
504 | int DeviceList::poll_blocking(bool &done) | |
505 | { | |
506 | int r = 0; | |
507 | while (!done && r == 0) { | |
508 | r = poll(poll_fds, num * 3, 100); | |
509 | if (r < 0) { | |
510 | r = -errno; | |
511 | lderr(cct) << __func__ << " poll failed " << r << dendl; | |
512 | ceph_abort(); | |
513 | } | |
514 | } | |
515 | ||
516 | if (r <= 0) | |
517 | return r; | |
518 | ||
519 | for (int i = 0; i < num ; i++) { | |
520 | Device *d = devices[i]; | |
521 | ||
522 | if (d->tx_cc->get_cq_event()) | |
523 | ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl; | |
524 | ||
525 | if (d->rx_cc->get_cq_event()) | |
526 | ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl; | |
527 | ||
528 | d->handle_async_event(); | |
529 | } | |
530 | ||
531 | return r; | |
532 | } | |
533 | ||
534 | void DeviceList::rearm_notify() | |
535 | { | |
536 | for (int i = 0; i < num; i++) | |
537 | devices[i]->rearm_cqs(); | |
538 | } | |
539 | ||
540 | void DeviceList::handle_async_event() | |
541 | { | |
542 | for (int i = 0; i < num; i++) | |
543 | devices[i]->handle_async_event(); | |
544 | } | |
545 | ||
546 | void DeviceList::uninit() | |
547 | { | |
548 | for (int i = 0; i < num; i++) | |
549 | devices[i]->uninit(); | |
550 | } |