]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | #include "RDMAStack.h" |
2 | ||
3 | #define dout_subsys ceph_subsys_ms | |
4 | #undef dout_prefix | |
5 | #define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl " | |
6 | ||
7 | #define TIMEOUT_MS 3000 | |
8 | #define RETRY_COUNT 7 | |
9 | ||
10 | RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, | |
11 | RDMAWorker *w, RDMACMInfo *info) | |
12 | : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this)) | |
13 | { | |
14 | status = IDLE; | |
15 | notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); | |
16 | if (info) { | |
17 | is_server = true; | |
18 | cm_id = info->cm_id; | |
19 | cm_channel = info->cm_channel; | |
20 | status = RDMA_ID_CREATED; | |
21 | remote_qpn = info->qp_num; | |
22 | if (alloc_resource()) { | |
23 | close_notify(); | |
24 | return; | |
25 | } | |
26 | worker->center.submit_to(worker->center.get_id(), [this]() { | |
27 | worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler); | |
28 | status = CHANNEL_FD_CREATED; | |
29 | }, false); | |
30 | status = RESOURCE_ALLOCATED; | |
31 | local_qpn = qp->get_local_qp_number(); | |
32 | my_msg.qpn = local_qpn; | |
33 | } else { | |
34 | is_server = false; | |
35 | cm_channel = rdma_create_event_channel(); | |
36 | rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP); | |
37 | status = RDMA_ID_CREATED; | |
38 | ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl; | |
39 | } | |
40 | } | |
41 | ||
42 | RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() { | |
43 | ldout(cct, 20) << __func__ << " destruct." << dendl; | |
44 | std::unique_lock l(close_mtx); | |
45 | close_condition.wait(l, [&] { return closed; }); | |
46 | if (status >= RDMA_ID_CREATED) { | |
47 | rdma_destroy_id(cm_id); | |
48 | rdma_destroy_event_channel(cm_channel); | |
49 | } | |
50 | } | |
51 | ||
52 | int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { | |
53 | worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler); | |
54 | status = CHANNEL_FD_CREATED; | |
55 | if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) { | |
56 | lderr(cct) << __func__ << " failed to resolve addr" << dendl; | |
57 | return -1; | |
58 | } | |
59 | return 0; | |
60 | } | |
61 | ||
62 | void RDMAIWARPConnectedSocketImpl::close() { | |
63 | error = ECONNRESET; | |
64 | active = false; | |
65 | if (status >= CONNECTED) { | |
66 | rdma_disconnect(cm_id); | |
67 | } | |
68 | close_notify(); | |
69 | } | |
70 | ||
71 | void RDMAIWARPConnectedSocketImpl::shutdown() { | |
72 | error = ECONNRESET; | |
73 | active = false; | |
74 | } | |
75 | ||
76 | void RDMAIWARPConnectedSocketImpl::handle_cm_connection() { | |
77 | struct rdma_cm_event *event; | |
78 | rdma_get_cm_event(cm_channel, &event); | |
79 | ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event) | |
80 | << " (cm id: " << cm_id << ")" << dendl; | |
81 | struct rdma_conn_param cm_params; | |
82 | switch (event->event) { | |
83 | case RDMA_CM_EVENT_ADDR_RESOLVED: | |
84 | status = ADDR_RESOLVED; | |
85 | if (rdma_resolve_route(cm_id, TIMEOUT_MS)) { | |
86 | lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl; | |
87 | notify(); | |
88 | } | |
89 | break; | |
90 | ||
91 | case RDMA_CM_EVENT_ROUTE_RESOLVED: | |
92 | status = ROUTE_RESOLVED; | |
93 | if (alloc_resource()) { | |
94 | lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl; | |
95 | connected = -ECONNREFUSED; | |
96 | notify(); | |
97 | break; | |
98 | } | |
99 | local_qpn = qp->get_local_qp_number(); | |
100 | my_msg.qpn = local_qpn; | |
101 | ||
92f5a8d4 | 102 | // FIPS zeroization audit 20191115: this memset is not security related. |
11fdf7f2 TL |
103 | memset(&cm_params, 0, sizeof(cm_params)); |
104 | cm_params.retry_count = RETRY_COUNT; | |
105 | cm_params.qp_num = local_qpn; | |
106 | if (rdma_connect(cm_id, &cm_params)) { | |
107 | lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl; | |
108 | connected = -ECONNREFUSED; | |
109 | notify(); | |
110 | } | |
111 | break; | |
112 | ||
113 | case RDMA_CM_EVENT_ESTABLISHED: | |
114 | ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl; | |
115 | status = CONNECTED; | |
116 | if (!is_server) { | |
117 | remote_qpn = event->param.conn.qp_num; | |
118 | activate(); | |
119 | notify(); | |
120 | } | |
121 | break; | |
122 | ||
123 | case RDMA_CM_EVENT_ADDR_ERROR: | |
124 | case RDMA_CM_EVENT_ROUTE_ERROR: | |
125 | case RDMA_CM_EVENT_CONNECT_ERROR: | |
126 | case RDMA_CM_EVENT_UNREACHABLE: | |
127 | case RDMA_CM_EVENT_REJECTED: | |
128 | lderr(cct) << __func__ << " rdma connection rejected" << dendl; | |
129 | connected = -ECONNREFUSED; | |
130 | notify(); | |
131 | break; | |
132 | ||
133 | case RDMA_CM_EVENT_DISCONNECTED: | |
134 | status = DISCONNECTED; | |
135 | close_notify(); | |
136 | if (!error) { | |
137 | error = ECONNRESET; | |
138 | notify(); | |
139 | } | |
140 | break; | |
141 | ||
142 | case RDMA_CM_EVENT_DEVICE_REMOVAL: | |
143 | break; | |
144 | ||
145 | default: | |
146 | ceph_abort_msg("unhandled event"); | |
147 | break; | |
148 | } | |
149 | rdma_ack_cm_event(event); | |
150 | } | |
151 | ||
152 | void RDMAIWARPConnectedSocketImpl::activate() { | |
153 | ldout(cct, 30) << __func__ << dendl; | |
154 | active = true; | |
155 | connected = 1; | |
156 | } | |
157 | ||
158 | int RDMAIWARPConnectedSocketImpl::alloc_resource() { | |
159 | ldout(cct, 30) << __func__ << dendl; | |
160 | qp = infiniband->create_queue_pair(cct, dispatcher->get_tx_cq(), | |
161 | dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id); | |
162 | if (!qp) { | |
163 | return -1; | |
164 | } | |
165 | if (!cct->_conf->ms_async_rdma_support_srq) | |
166 | dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp()); | |
167 | dispatcher->register_qp(qp, this); | |
168 | dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); | |
169 | dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); | |
170 | return 0; | |
171 | } | |
172 | ||
173 | void RDMAIWARPConnectedSocketImpl::close_notify() { | |
174 | ldout(cct, 30) << __func__ << dendl; | |
175 | if (status >= CHANNEL_FD_CREATED) { | |
176 | worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE); | |
177 | } | |
178 | std::unique_lock l(close_mtx); | |
179 | if (!closed) { | |
180 | closed = true; | |
181 | close_condition.notify_all(); | |
182 | } | |
183 | } |