]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
31f18b77 | 17 | #include <poll.h> |
11fdf7f2 | 18 | #include <errno.h> |
7c673cae FG |
19 | #include <sys/time.h> |
20 | #include <sys/resource.h> | |
21 | ||
22 | #include "include/str_list.h" | |
11fdf7f2 TL |
23 | #include "include/compat.h" |
24 | #include "common/Cycles.h" | |
7c673cae FG |
25 | #include "common/deleter.h" |
26 | #include "common/Tub.h" | |
27 | #include "RDMAStack.h" | |
7c673cae FG |
28 | |
29 | #define dout_subsys ceph_subsys_ms | |
30 | #undef dout_prefix | |
31 | #define dout_prefix *_dout << "RDMAStack " | |
32 | ||
7c673cae FG |
33 | RDMADispatcher::~RDMADispatcher() |
34 | { | |
7c673cae | 35 | ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; |
11fdf7f2 | 36 | polling_stop(); |
7c673cae | 37 | |
11fdf7f2 TL |
38 | ceph_assert(qp_conns.empty()); |
39 | ceph_assert(num_qp_conn == 0); | |
40 | ceph_assert(dead_queue_pairs.empty()); | |
7c673cae FG |
41 | } |
42 | ||
f67539c2 | 43 | RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib) |
9f95a23c | 44 | : cct(c), ib(ib) |
7c673cae FG |
45 | { |
46 | PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); | |
47 | ||
48 | plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); | |
49 | plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks"); | |
11fdf7f2 TL |
50 | plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed"); |
51 | plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers"); | |
7c673cae FG |
52 | |
53 | plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions"); | |
54 | plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors"); | |
55 | plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors"); | |
56 | plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors"); | |
57 | ||
58 | plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion"); | |
59 | plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion"); | |
60 | plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request"); | |
61 | ||
62 | plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events"); | |
63 | plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events"); | |
64 | ||
65 | plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors"); | |
66 | ||
67 | ||
68 | plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number"); | |
69 | plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number"); | |
70 | ||
71 | perf_logger = plb.create_perf_counters(); | |
72 | cct->get_perfcounters_collection()->add(perf_logger); | |
11fdf7f2 | 73 | Cycles::init(); |
7c673cae FG |
74 | } |
75 | ||
76 | void RDMADispatcher::polling_start() | |
77 | { | |
11fdf7f2 | 78 | // take lock because listen/connect can happen from different worker threads |
9f95a23c | 79 | std::lock_guard l{lock}; |
11fdf7f2 TL |
80 | |
81 | if (t.joinable()) | |
82 | return; // dispatcher thread already running | |
83 | ||
9f95a23c | 84 | ib->get_memory_manager()->set_rx_stat_logger(perf_logger); |
11fdf7f2 | 85 | |
9f95a23c | 86 | tx_cc = ib->create_comp_channel(cct); |
11fdf7f2 | 87 | ceph_assert(tx_cc); |
9f95a23c | 88 | rx_cc = ib->create_comp_channel(cct); |
11fdf7f2 | 89 | ceph_assert(rx_cc); |
9f95a23c | 90 | tx_cq = ib->create_comp_queue(cct, tx_cc); |
11fdf7f2 | 91 | ceph_assert(tx_cq); |
9f95a23c | 92 | rx_cq = ib->create_comp_queue(cct, rx_cc); |
11fdf7f2 | 93 | ceph_assert(rx_cq); |
31f18b77 | 94 | |
7c673cae | 95 | t = std::thread(&RDMADispatcher::polling, this); |
11fdf7f2 | 96 | ceph_pthread_setname(t.native_handle(), "rdma-polling"); |
7c673cae FG |
97 | } |
98 | ||
99 | void RDMADispatcher::polling_stop() | |
100 | { | |
11fdf7f2 | 101 | { |
9f95a23c | 102 | std::lock_guard l{lock}; |
11fdf7f2 TL |
103 | done = true; |
104 | } | |
105 | ||
106 | if (!t.joinable()) | |
107 | return; | |
108 | ||
109 | t.join(); | |
110 | ||
111 | tx_cc->ack_events(); | |
112 | rx_cc->ack_events(); | |
113 | delete tx_cq; | |
114 | delete rx_cq; | |
115 | delete tx_cc; | |
116 | delete rx_cc; | |
7c673cae FG |
117 | } |
118 | ||
31f18b77 | 119 | void RDMADispatcher::handle_async_event() |
7c673cae | 120 | { |
31f18b77 FG |
121 | ldout(cct, 30) << __func__ << dendl; |
122 | while (1) { | |
123 | ibv_async_event async_event; | |
9f95a23c | 124 | if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { |
31f18b77 FG |
125 | if (errno != EAGAIN) |
126 | lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno | |
127 | << " " << cpp_strerror(errno) << ")" << dendl; | |
128 | return; | |
129 | } | |
130 | perf_logger->inc(l_msgr_rdma_total_async_events); | |
9f95a23c TL |
131 | ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl; |
132 | ||
133 | switch (async_event.event_type) { | |
134 | /***********************CQ events********************/ | |
135 | case IBV_EVENT_CQ_ERR: | |
136 | lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, " | |
137 | << " CQ Overflow, dev = " << ib->get_device()->ctxt | |
138 | << " Need destroy and recreate resource " << dendl; | |
139 | break; | |
140 | /***********************QP events********************/ | |
141 | case IBV_EVENT_QP_FATAL: | |
142 | { | |
143 | /* Error occurred on a QP and it transitioned to error state */ | |
144 | ibv_qp* ib_qp = async_event.element.qp; | |
145 | uint32_t qpn = ib_qp->qp_num; | |
146 | QueuePair* qp = get_qp(qpn); | |
147 | lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn | |
148 | << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state()) | |
149 | << " Event : " << ibv_event_type_str(async_event.event_type) << dendl; | |
150 | } | |
151 | break; | |
152 | case IBV_EVENT_QP_LAST_WQE_REACHED: | |
153 | { | |
154 | /* | |
155 | * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP | |
156 | * Reason: QP is force switched into Error before posting Beacon WR. | |
157 | * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status | |
158 | * For SRQ, only WRs on the QP which is switched into Error status will be flushed. | |
159 | * Handle: Only confirm that qp enter into dead queue pairs | |
160 | * 2. The CQE with error was generated for the last WQE | |
161 | * Handle: output error log | |
162 | */ | |
163 | perf_logger->inc(l_msgr_rdma_async_last_wqe_events); | |
164 | ibv_qp* ib_qp = async_event.element.qp; | |
165 | uint32_t qpn = ib_qp->qp_num; | |
166 | std::lock_guard l{lock}; | |
167 | RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn); | |
168 | QueuePair* qp = get_qp_lockless(qpn); | |
169 | ||
170 | if (qp && !qp->is_dead()) { | |
171 | lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn | |
172 | << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state()) | |
173 | << " Event : " << ibv_event_type_str(async_event.event_type) << dendl; | |
174 | } | |
175 | if (!conn) { | |
176 | ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. " | |
177 | << " qp number: " << qpn << dendl; | |
178 | } else { | |
179 | conn->fault(); | |
180 | if (qp) { | |
f67539c2 TL |
181 | if (!cct->_conf->ms_async_rdma_cm) { |
182 | enqueue_dead_qp_lockless(qpn); | |
183 | } | |
9f95a23c TL |
184 | } |
185 | } | |
186 | } | |
187 | break; | |
188 | case IBV_EVENT_QP_REQ_ERR: | |
189 | /* Invalid Request Local Work Queue Error */ | |
190 | [[fallthrough]]; | |
191 | case IBV_EVENT_QP_ACCESS_ERR: | |
192 | /* Local access violation error */ | |
193 | [[fallthrough]]; | |
194 | case IBV_EVENT_COMM_EST: | |
195 | /* Communication was established on a QP */ | |
196 | [[fallthrough]]; | |
197 | case IBV_EVENT_SQ_DRAINED: | |
198 | /* Send Queue was drained of outstanding messages in progress */ | |
199 | [[fallthrough]]; | |
200 | case IBV_EVENT_PATH_MIG: | |
201 | /* A connection has migrated to the alternate path */ | |
202 | [[fallthrough]]; | |
203 | case IBV_EVENT_PATH_MIG_ERR: | |
204 | /* A connection failed to migrate to the alternate path */ | |
205 | break; | |
206 | /***********************SRQ events*******************/ | |
207 | case IBV_EVENT_SRQ_ERR: | |
208 | /* Error occurred on an SRQ */ | |
209 | [[fallthrough]]; | |
210 | case IBV_EVENT_SRQ_LIMIT_REACHED: | |
211 | /* SRQ limit was reached */ | |
212 | break; | |
213 | /***********************Port events******************/ | |
214 | case IBV_EVENT_PORT_ACTIVE: | |
215 | /* Link became active on a port */ | |
216 | [[fallthrough]]; | |
217 | case IBV_EVENT_PORT_ERR: | |
218 | /* Link became unavailable on a port */ | |
219 | [[fallthrough]]; | |
220 | case IBV_EVENT_LID_CHANGE: | |
221 | /* LID was changed on a port */ | |
222 | [[fallthrough]]; | |
223 | case IBV_EVENT_PKEY_CHANGE: | |
224 | /* P_Key table was changed on a port */ | |
225 | [[fallthrough]]; | |
226 | case IBV_EVENT_SM_CHANGE: | |
227 | /* SM was changed on a port */ | |
228 | [[fallthrough]]; | |
229 | case IBV_EVENT_CLIENT_REREGISTER: | |
230 | /* SM sent a CLIENT_REREGISTER request to a port */ | |
231 | [[fallthrough]]; | |
232 | case IBV_EVENT_GID_CHANGE: | |
233 | /* GID table was changed on a port */ | |
234 | break; | |
235 | ||
236 | /***********************CA events******************/ | |
237 | //CA events: | |
238 | case IBV_EVENT_DEVICE_FATAL: | |
239 | /* CA is in FATAL state */ | |
240 | lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt | |
241 | << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; | |
242 | break; | |
243 | default: | |
244 | lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt | |
245 | << " unknown event: " << async_event.event_type << dendl; | |
246 | break; | |
7c673cae | 247 | } |
31f18b77 | 248 | ibv_ack_async_event(&async_event); |
7c673cae FG |
249 | } |
250 | } | |
251 | ||
11fdf7f2 TL |
252 | void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) |
253 | { | |
9f95a23c TL |
254 | std::lock_guard l{lock}; |
255 | ib->post_chunk_to_pool(chunk); | |
11fdf7f2 TL |
256 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); |
257 | } | |
258 | ||
9f95a23c | 259 | int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp) |
11fdf7f2 | 260 | { |
9f95a23c TL |
261 | std::lock_guard l{lock}; |
262 | return ib->post_chunks_to_rq(num, qp); | |
11fdf7f2 TL |
263 | } |
264 | ||
7c673cae FG |
265 | void RDMADispatcher::polling() |
266 | { | |
267 | static int MAX_COMPLETIONS = 32; | |
268 | ibv_wc wc[MAX_COMPLETIONS]; | |
269 | ||
270 | std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled; | |
271 | std::vector<ibv_wc> tx_cqe; | |
31f18b77 | 272 | ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; |
11fdf7f2 | 273 | uint64_t last_inactive = Cycles::rdtsc(); |
7c673cae FG |
274 | bool rearmed = false; |
275 | int r = 0; | |
276 | ||
277 | while (true) { | |
31f18b77 | 278 | int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc); |
7c673cae FG |
279 | if (tx_ret > 0) { |
280 | ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret | |
281 | << " responses."<< dendl; | |
31f18b77 | 282 | handle_tx_event(wc, tx_ret); |
7c673cae FG |
283 | } |
284 | ||
31f18b77 | 285 | int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc); |
7c673cae | 286 | if (rx_ret > 0) { |
11fdf7f2 | 287 | ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret |
7c673cae | 288 | << " responses."<< dendl; |
9f95a23c | 289 | handle_rx_event(wc, rx_ret); |
7c673cae FG |
290 | } |
291 | ||
292 | if (!tx_ret && !rx_ret) { | |
7c673cae | 293 | perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight); |
9f95a23c TL |
294 | // |
295 | // Clean up dead QPs when rx/tx CQs are in idle. The thing is that | |
296 | // we can destroy QPs even earlier, just when beacon has been received, | |
297 | // but we have two CQs (rx & tx), thus beacon WC can be poped from tx | |
298 | // CQ before other WCs are fully consumed from rx CQ. For safety, we | |
299 | // wait for beacon and then "no-events" from CQs. | |
300 | // | |
301 | // Calling size() on vector without locks is totally fine, since we | |
302 | // use it as a hint (accuracy is not important here) | |
303 | // | |
304 | if (!dead_queue_pairs.empty()) { | |
305 | decltype(dead_queue_pairs) dead_qps; | |
306 | { | |
307 | std::lock_guard l{lock}; | |
308 | dead_queue_pairs.swap(dead_qps); | |
309 | } | |
310 | ||
311 | for (auto& qp: dead_qps) { | |
312 | perf_logger->dec(l_msgr_rdma_active_queue_pair); | |
313 | ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl; | |
314 | delete qp; | |
7c673cae FG |
315 | } |
316 | } | |
9f95a23c | 317 | |
11fdf7f2 | 318 | if (!num_qp_conn && done && dead_queue_pairs.empty()) |
7c673cae FG |
319 | break; |
320 | ||
11fdf7f2 TL |
321 | uint64_t now = Cycles::rdtsc(); |
322 | if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) { | |
31f18b77 | 323 | handle_async_event(); |
7c673cae FG |
324 | if (!rearmed) { |
325 | // Clean up cq events after rearm notify ensure no new incoming event | |
326 | // arrived between polling and rearm | |
31f18b77 FG |
327 | tx_cq->rearm_notify(); |
328 | rx_cq->rearm_notify(); | |
7c673cae FG |
329 | rearmed = true; |
330 | continue; | |
331 | } | |
332 | ||
31f18b77 FG |
333 | struct pollfd channel_poll[2]; |
334 | channel_poll[0].fd = tx_cc->get_fd(); | |
9f95a23c | 335 | channel_poll[0].events = POLLIN; |
31f18b77 FG |
336 | channel_poll[0].revents = 0; |
337 | channel_poll[1].fd = rx_cc->get_fd(); | |
9f95a23c | 338 | channel_poll[1].events = POLLIN; |
31f18b77 FG |
339 | channel_poll[1].revents = 0; |
340 | r = 0; | |
7c673cae | 341 | perf_logger->set(l_msgr_rdma_polling, 0); |
31f18b77 | 342 | while (!done && r == 0) { |
11fdf7f2 | 343 | r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100)); |
31f18b77 FG |
344 | if (r < 0) { |
345 | r = -errno; | |
346 | lderr(cct) << __func__ << " poll failed " << r << dendl; | |
347 | ceph_abort(); | |
348 | } | |
349 | } | |
350 | if (r > 0 && tx_cc->get_cq_event()) | |
351 | ldout(cct, 20) << __func__ << " got tx cq event." << dendl; | |
352 | if (r > 0 && rx_cc->get_cq_event()) | |
353 | ldout(cct, 20) << __func__ << " got rx cq event." << dendl; | |
11fdf7f2 | 354 | last_inactive = Cycles::rdtsc(); |
7c673cae FG |
355 | perf_logger->set(l_msgr_rdma_polling, 1); |
356 | rearmed = false; | |
357 | } | |
358 | } | |
359 | } | |
360 | } | |
361 | ||
362 | void RDMADispatcher::notify_pending_workers() { | |
363 | if (num_pending_workers) { | |
364 | RDMAWorker *w = nullptr; | |
365 | { | |
9f95a23c | 366 | std::lock_guard l{w_lock}; |
7c673cae FG |
367 | if (!pending_workers.empty()) { |
368 | w = pending_workers.front(); | |
369 | pending_workers.pop_front(); | |
370 | --num_pending_workers; | |
371 | } | |
372 | } | |
373 | if (w) | |
374 | w->notify_worker(); | |
375 | } | |
376 | } | |
377 | ||
11fdf7f2 | 378 | void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) |
7c673cae | 379 | { |
9f95a23c | 380 | std::lock_guard l{lock}; |
11fdf7f2 | 381 | ceph_assert(!qp_conns.count(qp->get_local_qp_number())); |
7c673cae FG |
382 | qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); |
383 | ++num_qp_conn; | |
7c673cae FG |
384 | } |
385 | ||
386 | RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp) | |
387 | { | |
388 | auto it = qp_conns.find(qp); | |
389 | if (it == qp_conns.end()) | |
390 | return nullptr; | |
391 | if (it->second.first->is_dead()) | |
392 | return nullptr; | |
393 | return it->second.second; | |
394 | } | |
395 | ||
9f95a23c | 396 | Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp) |
11fdf7f2 | 397 | { |
11fdf7f2 TL |
398 | // Try to find the QP in qp_conns firstly. |
399 | auto it = qp_conns.find(qp); | |
400 | if (it != qp_conns.end()) | |
401 | return it->second.first; | |
402 | ||
403 | // Try again in dead_queue_pairs. | |
404 | for (auto &i: dead_queue_pairs) | |
405 | if (i->get_local_qp_number() == qp) | |
406 | return i; | |
407 | ||
408 | return nullptr; | |
409 | } | |
410 | ||
9f95a23c | 411 | Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp) |
7c673cae | 412 | { |
9f95a23c TL |
413 | std::lock_guard l{lock}; |
414 | return get_qp_lockless(qp); | |
415 | } | |
416 | ||
f67539c2 | 417 | void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn) |
9f95a23c | 418 | { |
7c673cae | 419 | auto it = qp_conns.find(qpn); |
9f95a23c TL |
420 | if (it == qp_conns.end()) { |
421 | lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl; | |
7c673cae | 422 | return ; |
9f95a23c TL |
423 | } |
424 | QueuePair *qp = it->second.first; | |
425 | dead_queue_pairs.push_back(qp); | |
7c673cae FG |
426 | qp_conns.erase(it); |
427 | --num_qp_conn; | |
428 | } | |
429 | ||
f67539c2 TL |
430 | void RDMADispatcher::enqueue_dead_qp(uint32_t qpn) |
431 | { | |
432 | std::lock_guard l{lock}; | |
433 | enqueue_dead_qp_lockless(qpn); | |
434 | } | |
435 | ||
9f95a23c | 436 | void RDMADispatcher::schedule_qp_destroy(uint32_t qpn) |
7c673cae | 437 | { |
9f95a23c TL |
438 | std::lock_guard l{lock}; |
439 | auto it = qp_conns.find(qpn); | |
440 | if (it == qp_conns.end()) { | |
441 | lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl; | |
442 | return; | |
443 | } | |
444 | QueuePair *qp = it->second.first; | |
445 | if (qp->to_dead()) { | |
446 | // | |
447 | // Failed to switch to dead. This is abnormal, but we can't | |
448 | // do anything, so just destroy QP. | |
449 | // | |
450 | dead_queue_pairs.push_back(qp); | |
451 | qp_conns.erase(it); | |
452 | --num_qp_conn; | |
453 | } else { | |
454 | // | |
455 | // Successfully switched to dead, thus keep entry in the map. | |
456 | // But only zero out socked pointer in order to return null from | |
457 | // get_conn_lockless(); | |
458 | it->second.second = nullptr; | |
459 | } | |
7c673cae FG |
460 | } |
461 | ||
31f18b77 | 462 | void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) |
7c673cae FG |
463 | { |
464 | std::vector<Chunk*> tx_chunks; | |
465 | ||
466 | for (int i = 0; i < n; ++i) { | |
467 | ibv_wc* response = &cqe[i]; | |
11fdf7f2 | 468 | |
9f95a23c TL |
469 | // If it's beacon WR, enqueue the QP to be destroyed later |
470 | if (response->wr_id == BEACON_WRID) { | |
471 | enqueue_dead_qp(response->qp_num); | |
472 | continue; | |
473 | } | |
474 | ||
475 | ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len | |
476 | << " status: " << ib->wc_status_to_string(response->status) << dendl; | |
7c673cae FG |
477 | |
478 | if (response->status != IBV_WC_SUCCESS) { | |
9f95a23c TL |
479 | switch(response->status) { |
480 | case IBV_WC_RETRY_EXC_ERR: | |
481 | { | |
482 | perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors); | |
483 | ||
484 | ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state " | |
485 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
486 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
487 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
488 | ||
489 | std::lock_guard l{lock}; | |
490 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
491 | if (conn) { | |
492 | ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: " | |
493 | << conn->get_peer_qpn() << dendl; | |
494 | } | |
495 | } | |
496 | break; | |
497 | case IBV_WC_WR_FLUSH_ERR: | |
498 | { | |
499 | perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors); | |
500 | ||
501 | std::lock_guard l{lock}; | |
502 | QueuePair *qp = get_qp_lockless(response->qp_num); | |
503 | if (qp) { | |
504 | ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; | |
505 | } | |
506 | if (qp && qp->is_dead()) { | |
507 | ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl; | |
508 | } else { | |
509 | lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR," | |
510 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
511 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
512 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
513 | ||
514 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
515 | if (conn) { | |
516 | ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: " | |
517 | << conn->get_peer_qpn() << dendl; | |
518 | } | |
519 | } | |
520 | } | |
521 | break; | |
522 | ||
523 | default: | |
524 | { | |
525 | lderr(cct) << __func__ << " SQ WR return error," | |
526 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
527 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
528 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
529 | ||
530 | std::lock_guard l{lock}; | |
531 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
532 | if (conn && conn->is_connected()) { | |
533 | ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state() | |
534 | << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl; | |
535 | conn->fault(); | |
536 | } else { | |
537 | ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl; | |
538 | } | |
539 | } | |
540 | break; | |
7c673cae FG |
541 | } |
542 | } | |
543 | ||
9f95a23c TL |
544 | auto chunk = reinterpret_cast<Chunk *>(response->wr_id); |
545 | //TX completion may come either from | |
546 | // 1) regular send message, WCE wr_id points to chunk | |
547 | // 2) 'fin' message, wr_id points to the QP | |
f67539c2 | 548 | if (ib->get_memory_manager()->is_valid_chunk(chunk)) { |
7c673cae | 549 | tx_chunks.push_back(chunk); |
31f18b77 FG |
550 | } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) { |
551 | ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; | |
552 | } else { | |
7c673cae | 553 | ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; |
31f18b77 FG |
554 | ceph_abort(); |
555 | } | |
7c673cae FG |
556 | } |
557 | ||
558 | perf_logger->inc(l_msgr_rdma_tx_total_wc, n); | |
31f18b77 | 559 | post_tx_buffer(tx_chunks); |
7c673cae FG |
560 | } |
561 | ||
562 | /** | |
563 | * Add the given Chunks to the given free queue. | |
564 | * | |
565 | * \param[in] chunks | |
566 | * The Chunks to enqueue. | |
567 | * \return | |
568 | * 0 if success or -1 for failure | |
569 | */ | |
31f18b77 | 570 | void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks) |
7c673cae FG |
571 | { |
572 | if (chunks.empty()) | |
573 | return ; | |
574 | ||
575 | inflight -= chunks.size(); | |
9f95a23c | 576 | ib->get_memory_manager()->return_tx(chunks); |
7c673cae FG |
577 | ldout(cct, 30) << __func__ << " release " << chunks.size() |
578 | << " chunks, inflight " << inflight << dendl; | |
579 | notify_pending_workers(); | |
580 | } | |
581 | ||
9f95a23c TL |
582 | void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number) |
583 | { | |
584 | perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number); | |
585 | perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number); | |
586 | ||
587 | std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled; | |
588 | std::lock_guard l{lock};//make sure connected socket alive when pass wc | |
589 | ||
590 | for (int i = 0; i < rx_number; ++i) { | |
591 | ibv_wc* response = &cqe[i]; | |
592 | Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); | |
593 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
594 | QueuePair *qp = get_qp_lockless(response->qp_num); | |
595 | ||
596 | switch (response->status) { | |
597 | case IBV_WC_SUCCESS: | |
598 | ceph_assert(response->opcode == IBV_WC_RECV); | |
599 | if (!conn) { | |
600 | ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x" | |
601 | << std::hex << chunk << " will be back." << std::dec << dendl; | |
602 | ib->post_chunk_to_pool(chunk); | |
603 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); | |
604 | } else { | |
605 | conn->post_chunks_to_rq(1); | |
606 | polled[conn].push_back(*response); | |
607 | ||
608 | if (qp != nullptr && !qp->get_srq()) { | |
609 | qp->remove_rq_wr(chunk); | |
610 | chunk->clear_qp(); | |
611 | } | |
612 | } | |
613 | break; | |
614 | ||
615 | case IBV_WC_WR_FLUSH_ERR: | |
616 | perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); | |
617 | ||
618 | if (qp) { | |
619 | ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; | |
620 | } | |
621 | if (qp && qp->is_dead()) { | |
622 | ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl; | |
623 | } else { | |
624 | ldout(cct, 1) << __func__ << " RQ WR return error," | |
625 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
626 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
627 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
628 | if (conn) { | |
629 | ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: " | |
630 | << conn->get_peer_qpn() << dendl; | |
631 | } | |
632 | } | |
633 | ||
634 | ib->post_chunk_to_pool(chunk); | |
635 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); | |
636 | break; | |
7c673cae | 637 | |
9f95a23c TL |
638 | default: |
639 | perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); | |
640 | ||
641 | ldout(cct, 1) << __func__ << " RQ WR return error," | |
642 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
643 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
644 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
645 | if (conn && conn->is_connected()) | |
646 | conn->fault(); | |
647 | ||
648 | ib->post_chunk_to_pool(chunk); | |
649 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); | |
650 | break; | |
651 | } | |
652 | } | |
653 | ||
654 | for (auto &i : polled) | |
655 | i.first->pass_wc(std::move(i.second)); | |
656 | polled.clear(); | |
657 | } | |
658 | ||
659 | RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id) | |
660 | : Worker(c, worker_id), | |
661 | tx_handler(new C_handle_cq_tx(this)) | |
7c673cae FG |
662 | { |
663 | // initialize perf_logger | |
664 | char name[128]; | |
665 | sprintf(name, "AsyncMessenger::RDMAWorker-%u", id); | |
666 | PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last); | |
667 | ||
668 | plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer"); | |
669 | plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer"); | |
670 | plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted"); | |
7c673cae FG |
671 | |
672 | plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted"); | |
11fdf7f2 | 673 | plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES)); |
7c673cae | 674 | plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted"); |
11fdf7f2 | 675 | plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES)); |
224ce89b | 676 | plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns"); |
7c673cae FG |
677 | |
678 | perf_logger = plb.create_perf_counters(); | |
679 | cct->get_perfcounters_collection()->add(perf_logger); | |
680 | } | |
681 | ||
682 | RDMAWorker::~RDMAWorker() | |
683 | { | |
684 | delete tx_handler; | |
685 | } | |
686 | ||
687 | void RDMAWorker::initialize() | |
688 | { | |
9f95a23c | 689 | ceph_assert(dispatcher); |
7c673cae FG |
690 | } |
691 | ||
11fdf7f2 TL |
692 | int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot, |
693 | const SocketOptions &opt,ServerSocket *sock) | |
7c673cae | 694 | { |
9f95a23c | 695 | ib->init(); |
11fdf7f2 | 696 | dispatcher->polling_start(); |
9f95a23c | 697 | |
11fdf7f2 TL |
698 | RDMAServerSocketImpl *p; |
699 | if (cct->_conf->ms_async_rdma_type == "iwarp") { | |
9f95a23c | 700 | p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot); |
11fdf7f2 | 701 | } else { |
9f95a23c | 702 | p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot); |
11fdf7f2 | 703 | } |
7c673cae FG |
704 | int r = p->listen(sa, opt); |
705 | if (r < 0) { | |
706 | delete p; | |
707 | return r; | |
708 | } | |
709 | ||
710 | *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p)); | |
711 | return 0; | |
712 | } | |
713 | ||
714 | int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) | |
715 | { | |
9f95a23c | 716 | ib->init(); |
11fdf7f2 TL |
717 | dispatcher->polling_start(); |
718 | ||
719 | RDMAConnectedSocketImpl* p; | |
720 | if (cct->_conf->ms_async_rdma_type == "iwarp") { | |
9f95a23c | 721 | p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this); |
11fdf7f2 | 722 | } else { |
9f95a23c | 723 | p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this); |
11fdf7f2 | 724 | } |
7c673cae FG |
725 | int r = p->try_connect(addr, opts); |
726 | ||
727 | if (r < 0) { | |
728 | ldout(cct, 1) << __func__ << " try connecting failed." << dendl; | |
729 | delete p; | |
730 | return r; | |
731 | } | |
732 | std::unique_ptr<RDMAConnectedSocketImpl> csi(p); | |
733 | *socket = ConnectedSocket(std::move(csi)); | |
734 | return 0; | |
735 | } | |
736 | ||
737 | int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes) | |
738 | { | |
11fdf7f2 | 739 | ceph_assert(center.in_thread()); |
9f95a23c TL |
740 | int r = ib->get_tx_buffers(c, bytes); |
741 | size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r; | |
7c673cae | 742 | ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; |
9f95a23c | 743 | dispatcher->inflight += r; |
224ce89b | 744 | if (got >= bytes) |
7c673cae FG |
745 | return r; |
746 | ||
747 | if (o) { | |
224ce89b | 748 | if (!o->is_pending()) { |
7c673cae | 749 | pending_sent_conns.push_back(o); |
224ce89b WB |
750 | perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1); |
751 | o->set_pending(1); | |
752 | } | |
7c673cae FG |
753 | dispatcher->make_pending_worker(this); |
754 | } | |
755 | return r; | |
756 | } | |
757 | ||
758 | ||
759 | void RDMAWorker::handle_pending_message() | |
760 | { | |
761 | ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl; | |
7c673cae FG |
762 | while (!pending_sent_conns.empty()) { |
763 | RDMAConnectedSocketImpl *o = pending_sent_conns.front(); | |
764 | pending_sent_conns.pop_front(); | |
224ce89b WB |
765 | ssize_t r = o->submit(false); |
766 | ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; | |
767 | if (r < 0) { | |
768 | if (r == -EAGAIN) { | |
769 | pending_sent_conns.push_back(o); | |
770 | dispatcher->make_pending_worker(this); | |
771 | return ; | |
7c673cae | 772 | } |
224ce89b | 773 | o->fault(); |
7c673cae | 774 | } |
224ce89b WB |
775 | o->set_pending(0); |
776 | perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1); | |
7c673cae | 777 | } |
7c673cae FG |
778 | dispatcher->notify_pending_workers(); |
779 | } | |
780 | ||
f67539c2 TL |
781 | RDMAStack::RDMAStack(CephContext *cct) |
782 | : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)), | |
783 | rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib)) | |
7c673cae | 784 | { |
7c673cae | 785 | ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; |
7c673cae FG |
786 | |
787 | unsigned num = get_num_worker(); | |
788 | for (unsigned i = 0; i < num; ++i) { | |
789 | RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i)); | |
9f95a23c TL |
790 | w->set_dispatcher(rdma_dispatcher); |
791 | w->set_ib(ib); | |
7c673cae | 792 | } |
9f95a23c | 793 | ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl; |
7c673cae FG |
794 | } |
795 | ||
796 | RDMAStack::~RDMAStack() | |
797 | { | |
31f18b77 FG |
798 | if (cct->_conf->ms_async_rdma_enable_hugepage) { |
799 | unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction | |
800 | } | |
7c673cae FG |
801 | } |
802 | ||
803 | void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func) | |
804 | { | |
805 | threads.resize(i+1); | |
806 | threads[i] = std::thread(func); | |
807 | } | |
808 | ||
809 | void RDMAStack::join_worker(unsigned i) | |
810 | { | |
11fdf7f2 | 811 | ceph_assert(threads.size() > i && threads[i].joinable()); |
7c673cae FG |
812 | threads[i].join(); |
813 | } |