]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
31f18b77 | 17 | #include <poll.h> |
11fdf7f2 | 18 | #include <errno.h> |
7c673cae FG |
19 | #include <sys/time.h> |
20 | #include <sys/resource.h> | |
21 | ||
22 | #include "include/str_list.h" | |
11fdf7f2 TL |
23 | #include "include/compat.h" |
24 | #include "common/Cycles.h" | |
7c673cae | 25 | #include "common/deleter.h" |
7c673cae | 26 | #include "RDMAStack.h" |
7c673cae FG |
27 | |
28 | #define dout_subsys ceph_subsys_ms | |
29 | #undef dout_prefix | |
30 | #define dout_prefix *_dout << "RDMAStack " | |
31 | ||
7c673cae FG |
32 | RDMADispatcher::~RDMADispatcher() |
33 | { | |
7c673cae | 34 | ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; |
11fdf7f2 | 35 | polling_stop(); |
7c673cae | 36 | |
11fdf7f2 TL |
37 | ceph_assert(qp_conns.empty()); |
38 | ceph_assert(num_qp_conn == 0); | |
39 | ceph_assert(dead_queue_pairs.empty()); | |
7c673cae FG |
40 | } |
41 | ||
f67539c2 | 42 | RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib) |
9f95a23c | 43 | : cct(c), ib(ib) |
7c673cae FG |
44 | { |
45 | PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); | |
46 | ||
47 | plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); | |
48 | plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks"); | |
11fdf7f2 TL |
49 | plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed"); |
50 | plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers"); | |
7c673cae FG |
51 | |
52 | plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions"); | |
53 | plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors"); | |
54 | plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors"); | |
55 | plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors"); | |
56 | ||
57 | plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion"); | |
58 | plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion"); | |
59 | plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request"); | |
60 | ||
61 | plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events"); | |
62 | plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events"); | |
63 | ||
64 | plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors"); | |
65 | ||
66 | ||
67 | plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number"); | |
68 | plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number"); | |
69 | ||
70 | perf_logger = plb.create_perf_counters(); | |
71 | cct->get_perfcounters_collection()->add(perf_logger); | |
11fdf7f2 | 72 | Cycles::init(); |
7c673cae FG |
73 | } |
74 | ||
75 | void RDMADispatcher::polling_start() | |
76 | { | |
11fdf7f2 | 77 | // take lock because listen/connect can happen from different worker threads |
9f95a23c | 78 | std::lock_guard l{lock}; |
11fdf7f2 TL |
79 | |
80 | if (t.joinable()) | |
81 | return; // dispatcher thread already running | |
82 | ||
9f95a23c | 83 | ib->get_memory_manager()->set_rx_stat_logger(perf_logger); |
11fdf7f2 | 84 | |
9f95a23c | 85 | tx_cc = ib->create_comp_channel(cct); |
11fdf7f2 | 86 | ceph_assert(tx_cc); |
9f95a23c | 87 | rx_cc = ib->create_comp_channel(cct); |
11fdf7f2 | 88 | ceph_assert(rx_cc); |
9f95a23c | 89 | tx_cq = ib->create_comp_queue(cct, tx_cc); |
11fdf7f2 | 90 | ceph_assert(tx_cq); |
9f95a23c | 91 | rx_cq = ib->create_comp_queue(cct, rx_cc); |
11fdf7f2 | 92 | ceph_assert(rx_cq); |
31f18b77 | 93 | |
7c673cae | 94 | t = std::thread(&RDMADispatcher::polling, this); |
11fdf7f2 | 95 | ceph_pthread_setname(t.native_handle(), "rdma-polling"); |
7c673cae FG |
96 | } |
97 | ||
98 | void RDMADispatcher::polling_stop() | |
99 | { | |
11fdf7f2 | 100 | { |
9f95a23c | 101 | std::lock_guard l{lock}; |
11fdf7f2 TL |
102 | done = true; |
103 | } | |
104 | ||
105 | if (!t.joinable()) | |
106 | return; | |
107 | ||
108 | t.join(); | |
109 | ||
110 | tx_cc->ack_events(); | |
111 | rx_cc->ack_events(); | |
112 | delete tx_cq; | |
113 | delete rx_cq; | |
114 | delete tx_cc; | |
115 | delete rx_cc; | |
7c673cae FG |
116 | } |
117 | ||
31f18b77 | 118 | void RDMADispatcher::handle_async_event() |
7c673cae | 119 | { |
31f18b77 FG |
120 | ldout(cct, 30) << __func__ << dendl; |
121 | while (1) { | |
122 | ibv_async_event async_event; | |
9f95a23c | 123 | if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { |
31f18b77 FG |
124 | if (errno != EAGAIN) |
125 | lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno | |
126 | << " " << cpp_strerror(errno) << ")" << dendl; | |
127 | return; | |
128 | } | |
129 | perf_logger->inc(l_msgr_rdma_total_async_events); | |
9f95a23c TL |
130 | ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl; |
131 | ||
132 | switch (async_event.event_type) { | |
133 | /***********************CQ events********************/ | |
134 | case IBV_EVENT_CQ_ERR: | |
135 | lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, " | |
136 | << " CQ Overflow, dev = " << ib->get_device()->ctxt | |
137 | << " Need destroy and recreate resource " << dendl; | |
138 | break; | |
139 | /***********************QP events********************/ | |
140 | case IBV_EVENT_QP_FATAL: | |
141 | { | |
142 | /* Error occurred on a QP and it transitioned to error state */ | |
143 | ibv_qp* ib_qp = async_event.element.qp; | |
144 | uint32_t qpn = ib_qp->qp_num; | |
145 | QueuePair* qp = get_qp(qpn); | |
146 | lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn | |
147 | << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state()) | |
148 | << " Event : " << ibv_event_type_str(async_event.event_type) << dendl; | |
149 | } | |
150 | break; | |
151 | case IBV_EVENT_QP_LAST_WQE_REACHED: | |
152 | { | |
153 | /* | |
154 | * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP | |
155 | * Reason: QP is force switched into Error before posting Beacon WR. | |
156 | * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status | |
157 | * For SRQ, only WRs on the QP which is switched into Error status will be flushed. | |
158 | * Handle: Only confirm that qp enter into dead queue pairs | |
159 | * 2. The CQE with error was generated for the last WQE | |
160 | * Handle: output error log | |
161 | */ | |
162 | perf_logger->inc(l_msgr_rdma_async_last_wqe_events); | |
163 | ibv_qp* ib_qp = async_event.element.qp; | |
164 | uint32_t qpn = ib_qp->qp_num; | |
165 | std::lock_guard l{lock}; | |
166 | RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn); | |
167 | QueuePair* qp = get_qp_lockless(qpn); | |
168 | ||
169 | if (qp && !qp->is_dead()) { | |
170 | lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn | |
171 | << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state()) | |
172 | << " Event : " << ibv_event_type_str(async_event.event_type) << dendl; | |
173 | } | |
174 | if (!conn) { | |
175 | ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. " | |
176 | << " qp number: " << qpn << dendl; | |
177 | } else { | |
178 | conn->fault(); | |
179 | if (qp) { | |
f67539c2 TL |
180 | if (!cct->_conf->ms_async_rdma_cm) { |
181 | enqueue_dead_qp_lockless(qpn); | |
182 | } | |
9f95a23c TL |
183 | } |
184 | } | |
185 | } | |
186 | break; | |
187 | case IBV_EVENT_QP_REQ_ERR: | |
188 | /* Invalid Request Local Work Queue Error */ | |
189 | [[fallthrough]]; | |
190 | case IBV_EVENT_QP_ACCESS_ERR: | |
191 | /* Local access violation error */ | |
192 | [[fallthrough]]; | |
193 | case IBV_EVENT_COMM_EST: | |
194 | /* Communication was established on a QP */ | |
195 | [[fallthrough]]; | |
196 | case IBV_EVENT_SQ_DRAINED: | |
197 | /* Send Queue was drained of outstanding messages in progress */ | |
198 | [[fallthrough]]; | |
199 | case IBV_EVENT_PATH_MIG: | |
200 | /* A connection has migrated to the alternate path */ | |
201 | [[fallthrough]]; | |
202 | case IBV_EVENT_PATH_MIG_ERR: | |
203 | /* A connection failed to migrate to the alternate path */ | |
204 | break; | |
205 | /***********************SRQ events*******************/ | |
206 | case IBV_EVENT_SRQ_ERR: | |
207 | /* Error occurred on an SRQ */ | |
208 | [[fallthrough]]; | |
209 | case IBV_EVENT_SRQ_LIMIT_REACHED: | |
210 | /* SRQ limit was reached */ | |
211 | break; | |
212 | /***********************Port events******************/ | |
213 | case IBV_EVENT_PORT_ACTIVE: | |
214 | /* Link became active on a port */ | |
215 | [[fallthrough]]; | |
216 | case IBV_EVENT_PORT_ERR: | |
217 | /* Link became unavailable on a port */ | |
218 | [[fallthrough]]; | |
219 | case IBV_EVENT_LID_CHANGE: | |
220 | /* LID was changed on a port */ | |
221 | [[fallthrough]]; | |
222 | case IBV_EVENT_PKEY_CHANGE: | |
223 | /* P_Key table was changed on a port */ | |
224 | [[fallthrough]]; | |
225 | case IBV_EVENT_SM_CHANGE: | |
226 | /* SM was changed on a port */ | |
227 | [[fallthrough]]; | |
228 | case IBV_EVENT_CLIENT_REREGISTER: | |
229 | /* SM sent a CLIENT_REREGISTER request to a port */ | |
230 | [[fallthrough]]; | |
231 | case IBV_EVENT_GID_CHANGE: | |
232 | /* GID table was changed on a port */ | |
233 | break; | |
234 | ||
235 | /***********************CA events******************/ | |
236 | //CA events: | |
237 | case IBV_EVENT_DEVICE_FATAL: | |
238 | /* CA is in FATAL state */ | |
239 | lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt | |
240 | << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; | |
241 | break; | |
242 | default: | |
243 | lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt | |
244 | << " unknown event: " << async_event.event_type << dendl; | |
245 | break; | |
7c673cae | 246 | } |
31f18b77 | 247 | ibv_ack_async_event(&async_event); |
7c673cae FG |
248 | } |
249 | } | |
250 | ||
11fdf7f2 TL |
251 | void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) |
252 | { | |
9f95a23c TL |
253 | std::lock_guard l{lock}; |
254 | ib->post_chunk_to_pool(chunk); | |
11fdf7f2 TL |
255 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); |
256 | } | |
257 | ||
9f95a23c | 258 | int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp) |
11fdf7f2 | 259 | { |
9f95a23c TL |
260 | std::lock_guard l{lock}; |
261 | return ib->post_chunks_to_rq(num, qp); | |
11fdf7f2 TL |
262 | } |
263 | ||
7c673cae FG |
264 | void RDMADispatcher::polling() |
265 | { | |
266 | static int MAX_COMPLETIONS = 32; | |
267 | ibv_wc wc[MAX_COMPLETIONS]; | |
268 | ||
269 | std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled; | |
270 | std::vector<ibv_wc> tx_cqe; | |
31f18b77 | 271 | ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; |
11fdf7f2 | 272 | uint64_t last_inactive = Cycles::rdtsc(); |
7c673cae FG |
273 | bool rearmed = false; |
274 | int r = 0; | |
275 | ||
276 | while (true) { | |
31f18b77 | 277 | int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc); |
7c673cae FG |
278 | if (tx_ret > 0) { |
279 | ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret | |
280 | << " responses."<< dendl; | |
31f18b77 | 281 | handle_tx_event(wc, tx_ret); |
7c673cae FG |
282 | } |
283 | ||
31f18b77 | 284 | int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc); |
7c673cae | 285 | if (rx_ret > 0) { |
11fdf7f2 | 286 | ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret |
7c673cae | 287 | << " responses."<< dendl; |
9f95a23c | 288 | handle_rx_event(wc, rx_ret); |
7c673cae FG |
289 | } |
290 | ||
291 | if (!tx_ret && !rx_ret) { | |
7c673cae | 292 | perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight); |
9f95a23c TL |
293 | // |
294 | // Clean up dead QPs when rx/tx CQs are in idle. The thing is that | |
295 | // we can destroy QPs even earlier, just when beacon has been received, | |
296 | // but we have two CQs (rx & tx), thus beacon WC can be poped from tx | |
297 | // CQ before other WCs are fully consumed from rx CQ. For safety, we | |
298 | // wait for beacon and then "no-events" from CQs. | |
299 | // | |
300 | // Calling size() on vector without locks is totally fine, since we | |
301 | // use it as a hint (accuracy is not important here) | |
302 | // | |
303 | if (!dead_queue_pairs.empty()) { | |
304 | decltype(dead_queue_pairs) dead_qps; | |
305 | { | |
306 | std::lock_guard l{lock}; | |
307 | dead_queue_pairs.swap(dead_qps); | |
308 | } | |
309 | ||
310 | for (auto& qp: dead_qps) { | |
311 | perf_logger->dec(l_msgr_rdma_active_queue_pair); | |
312 | ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl; | |
313 | delete qp; | |
7c673cae FG |
314 | } |
315 | } | |
9f95a23c | 316 | |
11fdf7f2 | 317 | if (!num_qp_conn && done && dead_queue_pairs.empty()) |
7c673cae FG |
318 | break; |
319 | ||
11fdf7f2 TL |
320 | uint64_t now = Cycles::rdtsc(); |
321 | if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) { | |
31f18b77 | 322 | handle_async_event(); |
7c673cae FG |
323 | if (!rearmed) { |
324 | // Clean up cq events after rearm notify ensure no new incoming event | |
325 | // arrived between polling and rearm | |
31f18b77 FG |
326 | tx_cq->rearm_notify(); |
327 | rx_cq->rearm_notify(); | |
7c673cae FG |
328 | rearmed = true; |
329 | continue; | |
330 | } | |
331 | ||
31f18b77 FG |
332 | struct pollfd channel_poll[2]; |
333 | channel_poll[0].fd = tx_cc->get_fd(); | |
9f95a23c | 334 | channel_poll[0].events = POLLIN; |
31f18b77 FG |
335 | channel_poll[0].revents = 0; |
336 | channel_poll[1].fd = rx_cc->get_fd(); | |
9f95a23c | 337 | channel_poll[1].events = POLLIN; |
31f18b77 FG |
338 | channel_poll[1].revents = 0; |
339 | r = 0; | |
7c673cae | 340 | perf_logger->set(l_msgr_rdma_polling, 0); |
31f18b77 | 341 | while (!done && r == 0) { |
11fdf7f2 | 342 | r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100)); |
31f18b77 FG |
343 | if (r < 0) { |
344 | r = -errno; | |
345 | lderr(cct) << __func__ << " poll failed " << r << dendl; | |
346 | ceph_abort(); | |
347 | } | |
348 | } | |
349 | if (r > 0 && tx_cc->get_cq_event()) | |
350 | ldout(cct, 20) << __func__ << " got tx cq event." << dendl; | |
351 | if (r > 0 && rx_cc->get_cq_event()) | |
352 | ldout(cct, 20) << __func__ << " got rx cq event." << dendl; | |
11fdf7f2 | 353 | last_inactive = Cycles::rdtsc(); |
7c673cae FG |
354 | perf_logger->set(l_msgr_rdma_polling, 1); |
355 | rearmed = false; | |
356 | } | |
357 | } | |
358 | } | |
359 | } | |
360 | ||
361 | void RDMADispatcher::notify_pending_workers() { | |
362 | if (num_pending_workers) { | |
363 | RDMAWorker *w = nullptr; | |
364 | { | |
9f95a23c | 365 | std::lock_guard l{w_lock}; |
7c673cae FG |
366 | if (!pending_workers.empty()) { |
367 | w = pending_workers.front(); | |
368 | pending_workers.pop_front(); | |
369 | --num_pending_workers; | |
370 | } | |
371 | } | |
372 | if (w) | |
373 | w->notify_worker(); | |
374 | } | |
375 | } | |
376 | ||
11fdf7f2 | 377 | void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) |
7c673cae | 378 | { |
9f95a23c | 379 | std::lock_guard l{lock}; |
11fdf7f2 | 380 | ceph_assert(!qp_conns.count(qp->get_local_qp_number())); |
7c673cae FG |
381 | qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); |
382 | ++num_qp_conn; | |
7c673cae FG |
383 | } |
384 | ||
385 | RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp) | |
386 | { | |
387 | auto it = qp_conns.find(qp); | |
388 | if (it == qp_conns.end()) | |
389 | return nullptr; | |
390 | if (it->second.first->is_dead()) | |
391 | return nullptr; | |
392 | return it->second.second; | |
393 | } | |
394 | ||
9f95a23c | 395 | Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp) |
11fdf7f2 | 396 | { |
11fdf7f2 TL |
397 | // Try to find the QP in qp_conns firstly. |
398 | auto it = qp_conns.find(qp); | |
399 | if (it != qp_conns.end()) | |
400 | return it->second.first; | |
401 | ||
402 | // Try again in dead_queue_pairs. | |
403 | for (auto &i: dead_queue_pairs) | |
404 | if (i->get_local_qp_number() == qp) | |
405 | return i; | |
406 | ||
407 | return nullptr; | |
408 | } | |
409 | ||
9f95a23c | 410 | Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp) |
7c673cae | 411 | { |
9f95a23c TL |
412 | std::lock_guard l{lock}; |
413 | return get_qp_lockless(qp); | |
414 | } | |
415 | ||
f67539c2 | 416 | void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn) |
9f95a23c | 417 | { |
7c673cae | 418 | auto it = qp_conns.find(qpn); |
9f95a23c TL |
419 | if (it == qp_conns.end()) { |
420 | lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl; | |
7c673cae | 421 | return ; |
9f95a23c TL |
422 | } |
423 | QueuePair *qp = it->second.first; | |
424 | dead_queue_pairs.push_back(qp); | |
7c673cae FG |
425 | qp_conns.erase(it); |
426 | --num_qp_conn; | |
427 | } | |
428 | ||
f67539c2 TL |
429 | void RDMADispatcher::enqueue_dead_qp(uint32_t qpn) |
430 | { | |
431 | std::lock_guard l{lock}; | |
432 | enqueue_dead_qp_lockless(qpn); | |
433 | } | |
434 | ||
9f95a23c | 435 | void RDMADispatcher::schedule_qp_destroy(uint32_t qpn) |
7c673cae | 436 | { |
9f95a23c TL |
437 | std::lock_guard l{lock}; |
438 | auto it = qp_conns.find(qpn); | |
439 | if (it == qp_conns.end()) { | |
440 | lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl; | |
441 | return; | |
442 | } | |
443 | QueuePair *qp = it->second.first; | |
444 | if (qp->to_dead()) { | |
445 | // | |
446 | // Failed to switch to dead. This is abnormal, but we can't | |
447 | // do anything, so just destroy QP. | |
448 | // | |
449 | dead_queue_pairs.push_back(qp); | |
450 | qp_conns.erase(it); | |
451 | --num_qp_conn; | |
452 | } else { | |
453 | // | |
454 | // Successfully switched to dead, thus keep entry in the map. | |
455 | // But only zero out socked pointer in order to return null from | |
456 | // get_conn_lockless(); | |
457 | it->second.second = nullptr; | |
458 | } | |
7c673cae FG |
459 | } |
460 | ||
31f18b77 | 461 | void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) |
7c673cae FG |
462 | { |
463 | std::vector<Chunk*> tx_chunks; | |
464 | ||
465 | for (int i = 0; i < n; ++i) { | |
466 | ibv_wc* response = &cqe[i]; | |
11fdf7f2 | 467 | |
9f95a23c TL |
468 | // If it's beacon WR, enqueue the QP to be destroyed later |
469 | if (response->wr_id == BEACON_WRID) { | |
470 | enqueue_dead_qp(response->qp_num); | |
471 | continue; | |
472 | } | |
473 | ||
474 | ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len | |
475 | << " status: " << ib->wc_status_to_string(response->status) << dendl; | |
7c673cae FG |
476 | |
477 | if (response->status != IBV_WC_SUCCESS) { | |
9f95a23c TL |
478 | switch(response->status) { |
479 | case IBV_WC_RETRY_EXC_ERR: | |
480 | { | |
481 | perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors); | |
482 | ||
483 | ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state " | |
484 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
485 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
486 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
487 | ||
488 | std::lock_guard l{lock}; | |
489 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
490 | if (conn) { | |
491 | ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: " | |
492 | << conn->get_peer_qpn() << dendl; | |
493 | } | |
494 | } | |
495 | break; | |
496 | case IBV_WC_WR_FLUSH_ERR: | |
497 | { | |
498 | perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors); | |
499 | ||
500 | std::lock_guard l{lock}; | |
501 | QueuePair *qp = get_qp_lockless(response->qp_num); | |
502 | if (qp) { | |
503 | ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; | |
504 | } | |
505 | if (qp && qp->is_dead()) { | |
506 | ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl; | |
507 | } else { | |
508 | lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR," | |
509 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
510 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
511 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
512 | ||
513 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
514 | if (conn) { | |
515 | ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: " | |
516 | << conn->get_peer_qpn() << dendl; | |
517 | } | |
518 | } | |
519 | } | |
520 | break; | |
521 | ||
522 | default: | |
523 | { | |
524 | lderr(cct) << __func__ << " SQ WR return error," | |
525 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
526 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
527 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
528 | ||
529 | std::lock_guard l{lock}; | |
530 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
531 | if (conn && conn->is_connected()) { | |
532 | ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state() | |
533 | << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl; | |
534 | conn->fault(); | |
535 | } else { | |
536 | ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl; | |
537 | } | |
538 | } | |
539 | break; | |
7c673cae FG |
540 | } |
541 | } | |
542 | ||
9f95a23c TL |
543 | auto chunk = reinterpret_cast<Chunk *>(response->wr_id); |
544 | //TX completion may come either from | |
545 | // 1) regular send message, WCE wr_id points to chunk | |
546 | // 2) 'fin' message, wr_id points to the QP | |
f67539c2 | 547 | if (ib->get_memory_manager()->is_valid_chunk(chunk)) { |
7c673cae | 548 | tx_chunks.push_back(chunk); |
31f18b77 FG |
549 | } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) { |
550 | ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; | |
551 | } else { | |
7c673cae | 552 | ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; |
31f18b77 FG |
553 | ceph_abort(); |
554 | } | |
7c673cae FG |
555 | } |
556 | ||
557 | perf_logger->inc(l_msgr_rdma_tx_total_wc, n); | |
31f18b77 | 558 | post_tx_buffer(tx_chunks); |
7c673cae FG |
559 | } |
560 | ||
561 | /** | |
562 | * Add the given Chunks to the given free queue. | |
563 | * | |
564 | * \param[in] chunks | |
565 | * The Chunks to enqueue. | |
566 | * \return | |
567 | * 0 if success or -1 for failure | |
568 | */ | |
31f18b77 | 569 | void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks) |
7c673cae FG |
570 | { |
571 | if (chunks.empty()) | |
572 | return ; | |
573 | ||
574 | inflight -= chunks.size(); | |
9f95a23c | 575 | ib->get_memory_manager()->return_tx(chunks); |
7c673cae FG |
576 | ldout(cct, 30) << __func__ << " release " << chunks.size() |
577 | << " chunks, inflight " << inflight << dendl; | |
578 | notify_pending_workers(); | |
579 | } | |
580 | ||
9f95a23c TL |
581 | void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number) |
582 | { | |
583 | perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number); | |
584 | perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number); | |
585 | ||
586 | std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled; | |
587 | std::lock_guard l{lock};//make sure connected socket alive when pass wc | |
588 | ||
589 | for (int i = 0; i < rx_number; ++i) { | |
590 | ibv_wc* response = &cqe[i]; | |
591 | Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); | |
592 | RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); | |
593 | QueuePair *qp = get_qp_lockless(response->qp_num); | |
594 | ||
595 | switch (response->status) { | |
596 | case IBV_WC_SUCCESS: | |
597 | ceph_assert(response->opcode == IBV_WC_RECV); | |
598 | if (!conn) { | |
599 | ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x" | |
600 | << std::hex << chunk << " will be back." << std::dec << dendl; | |
601 | ib->post_chunk_to_pool(chunk); | |
602 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); | |
603 | } else { | |
604 | conn->post_chunks_to_rq(1); | |
605 | polled[conn].push_back(*response); | |
606 | ||
607 | if (qp != nullptr && !qp->get_srq()) { | |
608 | qp->remove_rq_wr(chunk); | |
609 | chunk->clear_qp(); | |
610 | } | |
611 | } | |
612 | break; | |
613 | ||
614 | case IBV_WC_WR_FLUSH_ERR: | |
615 | perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); | |
616 | ||
617 | if (qp) { | |
618 | ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; | |
619 | } | |
620 | if (qp && qp->is_dead()) { | |
621 | ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl; | |
622 | } else { | |
623 | ldout(cct, 1) << __func__ << " RQ WR return error," | |
624 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
625 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
626 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
627 | if (conn) { | |
628 | ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: " | |
629 | << conn->get_peer_qpn() << dendl; | |
630 | } | |
631 | } | |
632 | ||
633 | ib->post_chunk_to_pool(chunk); | |
634 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); | |
635 | break; | |
7c673cae | 636 | |
9f95a23c TL |
637 | default: |
638 | perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); | |
639 | ||
640 | ldout(cct, 1) << __func__ << " RQ WR return error," | |
641 | << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) | |
642 | << " WCE QP number " << response->qp_num << " Opcode " << response->opcode | |
643 | << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; | |
644 | if (conn && conn->is_connected()) | |
645 | conn->fault(); | |
646 | ||
647 | ib->post_chunk_to_pool(chunk); | |
648 | perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); | |
649 | break; | |
650 | } | |
651 | } | |
652 | ||
653 | for (auto &i : polled) | |
654 | i.first->pass_wc(std::move(i.second)); | |
655 | polled.clear(); | |
656 | } | |
657 | ||
658 | RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id) | |
659 | : Worker(c, worker_id), | |
660 | tx_handler(new C_handle_cq_tx(this)) | |
7c673cae FG |
661 | { |
662 | // initialize perf_logger | |
663 | char name[128]; | |
664 | sprintf(name, "AsyncMessenger::RDMAWorker-%u", id); | |
665 | PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last); | |
666 | ||
667 | plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer"); | |
668 | plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer"); | |
669 | plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted"); | |
7c673cae FG |
670 | |
671 | plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted"); | |
11fdf7f2 | 672 | plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES)); |
7c673cae | 673 | plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted"); |
11fdf7f2 | 674 | plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES)); |
224ce89b | 675 | plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns"); |
7c673cae FG |
676 | |
677 | perf_logger = plb.create_perf_counters(); | |
678 | cct->get_perfcounters_collection()->add(perf_logger); | |
679 | } | |
680 | ||
681 | RDMAWorker::~RDMAWorker() | |
682 | { | |
683 | delete tx_handler; | |
684 | } | |
685 | ||
686 | void RDMAWorker::initialize() | |
687 | { | |
9f95a23c | 688 | ceph_assert(dispatcher); |
7c673cae FG |
689 | } |
690 | ||
11fdf7f2 TL |
691 | int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot, |
692 | const SocketOptions &opt,ServerSocket *sock) | |
7c673cae | 693 | { |
9f95a23c | 694 | ib->init(); |
11fdf7f2 | 695 | dispatcher->polling_start(); |
9f95a23c | 696 | |
11fdf7f2 TL |
697 | RDMAServerSocketImpl *p; |
698 | if (cct->_conf->ms_async_rdma_type == "iwarp") { | |
9f95a23c | 699 | p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot); |
11fdf7f2 | 700 | } else { |
9f95a23c | 701 | p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot); |
11fdf7f2 | 702 | } |
7c673cae FG |
703 | int r = p->listen(sa, opt); |
704 | if (r < 0) { | |
705 | delete p; | |
706 | return r; | |
707 | } | |
708 | ||
709 | *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p)); | |
710 | return 0; | |
711 | } | |
712 | ||
713 | int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) | |
714 | { | |
9f95a23c | 715 | ib->init(); |
11fdf7f2 TL |
716 | dispatcher->polling_start(); |
717 | ||
718 | RDMAConnectedSocketImpl* p; | |
719 | if (cct->_conf->ms_async_rdma_type == "iwarp") { | |
9f95a23c | 720 | p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this); |
11fdf7f2 | 721 | } else { |
9f95a23c | 722 | p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this); |
11fdf7f2 | 723 | } |
7c673cae FG |
724 | int r = p->try_connect(addr, opts); |
725 | ||
726 | if (r < 0) { | |
727 | ldout(cct, 1) << __func__ << " try connecting failed." << dendl; | |
728 | delete p; | |
729 | return r; | |
730 | } | |
731 | std::unique_ptr<RDMAConnectedSocketImpl> csi(p); | |
732 | *socket = ConnectedSocket(std::move(csi)); | |
733 | return 0; | |
734 | } | |
735 | ||
736 | int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes) | |
737 | { | |
11fdf7f2 | 738 | ceph_assert(center.in_thread()); |
9f95a23c TL |
739 | int r = ib->get_tx_buffers(c, bytes); |
740 | size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r; | |
7c673cae | 741 | ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; |
9f95a23c | 742 | dispatcher->inflight += r; |
224ce89b | 743 | if (got >= bytes) |
7c673cae FG |
744 | return r; |
745 | ||
746 | if (o) { | |
224ce89b | 747 | if (!o->is_pending()) { |
7c673cae | 748 | pending_sent_conns.push_back(o); |
224ce89b WB |
749 | perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1); |
750 | o->set_pending(1); | |
751 | } | |
7c673cae FG |
752 | dispatcher->make_pending_worker(this); |
753 | } | |
754 | return r; | |
755 | } | |
756 | ||
757 | ||
758 | void RDMAWorker::handle_pending_message() | |
759 | { | |
760 | ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl; | |
7c673cae FG |
761 | while (!pending_sent_conns.empty()) { |
762 | RDMAConnectedSocketImpl *o = pending_sent_conns.front(); | |
763 | pending_sent_conns.pop_front(); | |
224ce89b WB |
764 | ssize_t r = o->submit(false); |
765 | ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; | |
766 | if (r < 0) { | |
767 | if (r == -EAGAIN) { | |
768 | pending_sent_conns.push_back(o); | |
769 | dispatcher->make_pending_worker(this); | |
770 | return ; | |
7c673cae | 771 | } |
224ce89b | 772 | o->fault(); |
7c673cae | 773 | } |
224ce89b WB |
774 | o->set_pending(0); |
775 | perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1); | |
7c673cae | 776 | } |
7c673cae FG |
777 | dispatcher->notify_pending_workers(); |
778 | } | |
779 | ||
f67539c2 TL |
780 | RDMAStack::RDMAStack(CephContext *cct) |
781 | : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)), | |
782 | rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib)) | |
7c673cae | 783 | { |
7c673cae | 784 | ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; |
9f95a23c | 785 | ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl; |
7c673cae FG |
786 | } |
787 | ||
788 | RDMAStack::~RDMAStack() | |
789 | { | |
31f18b77 FG |
790 | if (cct->_conf->ms_async_rdma_enable_hugepage) { |
791 | unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction | |
792 | } | |
7c673cae FG |
793 | } |
794 | ||
20effc67 TL |
795 | Worker* RDMAStack::create_worker(CephContext *c, unsigned worker_id) |
796 | { | |
797 | auto w = new RDMAWorker(c, worker_id); | |
798 | w->set_dispatcher(rdma_dispatcher); | |
799 | w->set_ib(ib); | |
800 | return w; | |
801 | } | |
802 | ||
803 | void RDMAStack::spawn_worker(std::function<void ()> &&func) | |
7c673cae | 804 | { |
20effc67 | 805 | threads.emplace_back(std::move(func)); |
7c673cae FG |
806 | } |
807 | ||
808 | void RDMAStack::join_worker(unsigned i) | |
809 | { | |
11fdf7f2 | 810 | ceph_assert(threads.size() > i && threads[i].joinable()); |
7c673cae FG |
811 | threads[i].join(); |
812 | } |