]>
Commit | Line | Data |
---|---|---|
a4b75251 TL |
1 | // SPDX-License-Identifier: BSD-3-Clause |
2 | /* Copyright 2016-2020, Intel Corporation */ | |
3 | ||
4 | /* | |
5 | * rpmemd_fip.c -- rpmemd libfabric provider module source file | |
6 | */ | |
7 | ||
8 | #include <stddef.h> | |
9 | #include <stdint.h> | |
10 | #include <stdlib.h> | |
11 | #include <string.h> | |
12 | #include <limits.h> | |
13 | #include <netinet/in.h> | |
14 | #include <arpa/inet.h> | |
15 | ||
16 | #include <rdma/fabric.h> | |
17 | #include <rdma/fi_domain.h> | |
18 | #include <rdma/fi_endpoint.h> | |
19 | #include <rdma/fi_cm.h> | |
20 | #include <rdma/fi_errno.h> | |
21 | ||
22 | #include "rpmemd_log.h" | |
23 | ||
24 | #include "rpmem_common.h" | |
25 | #include "rpmem_proto.h" | |
26 | #include "rpmem_fip_msg.h" | |
27 | #include "rpmem_fip_common.h" | |
28 | #include "rpmemd_fip.h" | |
29 | ||
30 | #include "os_thread.h" | |
31 | #include "util.h" | |
32 | #include "valgrind_internal.h" | |
33 | ||
34 | #define RPMEMD_FI_ERR(e, fmt, args...)\ | |
35 | RPMEMD_LOG(ERR, fmt ": %s", ## args, fi_strerror((e))) | |
36 | ||
37 | #define RPMEMD_FI_CLOSE(f, fmt, args...) (\ | |
38 | {\ | |
39 | int ret = fi_close(&(f)->fid);\ | |
40 | if (ret)\ | |
41 | RPMEMD_FI_ERR(ret, fmt, ## args);\ | |
42 | ret;\ | |
43 | }) | |
44 | ||
45 | /* | |
46 | * rpmem_fip_lane -- base lane structure | |
47 | */ | |
48 | struct rpmem_fip_lane { | |
49 | struct fid_ep *ep; | |
50 | struct fid_cq *cq; | |
51 | }; | |
52 | ||
53 | /* | |
54 | * rpmemd_fip_lane -- daemon's lane | |
55 | */ | |
56 | struct rpmemd_fip_lane { | |
57 | struct rpmem_fip_lane base; /* lane base structure */ | |
58 | struct rpmem_fip_msg recv; /* RECV message */ | |
59 | struct rpmem_fip_msg send; /* SEND message */ | |
60 | struct rpmem_msg_persist_resp resp; /* persist response msg buffer */ | |
61 | int send_posted; /* send buffer has been posted */ | |
62 | int recv_posted; /* recv buffer has been posted */ | |
63 | }; | |
64 | ||
65 | /* | |
66 | * rpmemd_fip_thread -- thread context | |
67 | */ | |
68 | struct rpmemd_fip_thread { | |
69 | struct rpmemd_fip *fip; /* main context */ | |
70 | os_thread_t thread; /* thread structure */ | |
71 | struct fid_cq *cq; /* per-thread completion queue */ | |
72 | struct rpmemd_fip_lane **lanes; /* lanes processed by this thread */ | |
73 | size_t nlanes; /* number of lanes processed by this thread */ | |
74 | }; | |
75 | ||
76 | /* | |
77 | * rpmemd_fip -- main context of rpmemd_fip | |
78 | */ | |
79 | struct rpmemd_fip { | |
80 | struct fi_info *fi; /* fabric interface information */ | |
81 | struct fid_fabric *fabric; /* fabric domain */ | |
82 | struct fid_domain *domain; /* fabric protection domain */ | |
83 | struct fid_eq *eq; /* event queue */ | |
84 | struct fid_pep *pep; /* passive endpoint - listener */ | |
85 | struct fid_mr *mr; /* memory region for pool */ | |
86 | ||
87 | int (*persist)(const void *addr, size_t len); /* persist function */ | |
88 | void *(*memcpy_persist)(void *pmemdest, const void *src, size_t len); | |
89 | int (*deep_persist)(const void *addr, size_t len, void *ctx); | |
90 | void *ctx; | |
91 | void *addr; /* pool's address */ | |
92 | size_t size; /* size of the pool */ | |
93 | enum rpmem_persist_method persist_method; | |
94 | ||
95 | volatile int closing; /* flag for closing background threads */ | |
96 | unsigned nlanes; /* number of lanes */ | |
97 | size_t nthreads; /* number of threads for processing */ | |
98 | size_t cq_size; /* size of completion queue */ | |
99 | size_t lanes_per_thread; /* number of lanes per thread */ | |
100 | size_t buff_size; /* size of buffer for inlined data */ | |
101 | ||
102 | struct rpmemd_fip_lane *lanes; | |
103 | struct rpmem_fip_lane rd_lane; /* lane for read operation */ | |
104 | ||
105 | void *pmsg; /* persist message buffer */ | |
106 | size_t pmsg_size; /* persist message buffer size including alignment */ | |
107 | struct fid_mr *pmsg_mr; /* persist message memory region */ | |
108 | void *pmsg_mr_desc; /* persist message local descriptor */ | |
109 | ||
110 | struct rpmem_msg_persist_resp *pres; /* persist response buffer */ | |
111 | struct fid_mr *pres_mr; /* persist response memory region */ | |
112 | void *pres_mr_desc; /* persist response local descriptor */ | |
113 | ||
114 | struct rpmemd_fip_thread *threads; | |
115 | }; | |
116 | ||
117 | /* | |
118 | * rpmemd_fip_get_pmsg -- return persist message buffer | |
119 | */ | |
120 | static inline struct rpmem_msg_persist * | |
121 | rpmemd_fip_get_pmsg(struct rpmemd_fip *fip, size_t idx) | |
122 | { | |
123 | return (struct rpmem_msg_persist *) | |
124 | ((uintptr_t)fip->pmsg + idx * fip->pmsg_size); | |
125 | } | |
126 | ||
127 | /* | |
128 | * rpmemd_fip_getinfo -- obtain fabric interface information | |
129 | */ | |
130 | static int | |
131 | rpmemd_fip_getinfo(struct rpmemd_fip *fip, const char *service, | |
132 | const char *node, enum rpmem_provider provider) | |
133 | { | |
134 | int ret; | |
135 | ||
136 | struct fi_info *hints = rpmem_fip_get_hints(provider); | |
137 | if (!hints) { | |
138 | RPMEMD_LOG(ERR, "getting fabric interface hints"); | |
139 | ret = -1; | |
140 | goto err_fi_get_hints; | |
141 | } | |
142 | ||
143 | ret = fi_getinfo(RPMEM_FIVERSION, node, service, FI_SOURCE, | |
144 | hints, &fip->fi); | |
145 | if (ret) { | |
146 | RPMEMD_FI_ERR(ret, "getting fabric interface information"); | |
147 | goto err_fi_getinfo; | |
148 | } | |
149 | ||
150 | rpmem_fip_print_info(fip->fi); | |
151 | ||
152 | fi_freeinfo(hints); | |
153 | return 0; | |
154 | err_fi_getinfo: | |
155 | fi_freeinfo(hints); | |
156 | err_fi_get_hints: | |
157 | return ret; | |
158 | } | |
159 | ||
160 | /* | |
161 | * rpmemd_fip_set_resp -- fill the response structure | |
162 | */ | |
163 | static int | |
164 | rpmemd_fip_set_resp(struct rpmemd_fip *fip, struct rpmem_resp_attr *resp) | |
165 | { | |
166 | int ret; | |
167 | if (fip->fi->addr_format == FI_SOCKADDR_IN) { | |
168 | struct sockaddr_in addr_in; | |
169 | size_t addrlen = sizeof(addr_in); | |
170 | ||
171 | ret = fi_getname(&fip->pep->fid, &addr_in, &addrlen); | |
172 | if (ret) { | |
173 | RPMEMD_FI_ERR(ret, "getting local endpoint address"); | |
174 | goto err_fi_getname; | |
175 | } | |
176 | ||
177 | if (!addr_in.sin_port) { | |
178 | RPMEMD_LOG(ERR, "dynamic allocation of port failed"); | |
179 | goto err_port; | |
180 | } | |
181 | ||
182 | resp->port = htons(addr_in.sin_port); | |
183 | } else if (fip->fi->addr_format == FI_SOCKADDR_IN6) { | |
184 | struct sockaddr_in6 addr_in6; | |
185 | size_t addrlen = sizeof(addr_in6); | |
186 | ||
187 | ret = fi_getname(&fip->pep->fid, &addr_in6, &addrlen); | |
188 | if (ret) { | |
189 | RPMEMD_FI_ERR(ret, "getting local endpoint address"); | |
190 | goto err_fi_getname; | |
191 | } | |
192 | ||
193 | if (!addr_in6.sin6_port) { | |
194 | RPMEMD_LOG(ERR, "dynamic allocation of port failed"); | |
195 | goto err_port; | |
196 | } | |
197 | ||
198 | resp->port = htons(addr_in6.sin6_port); | |
199 | } else { | |
200 | RPMEMD_LOG(ERR, "invalid address format"); | |
201 | return -1; | |
202 | } | |
203 | ||
204 | resp->rkey = fi_mr_key(fip->mr); | |
205 | resp->persist_method = fip->persist_method; | |
206 | resp->raddr = (uint64_t)fip->addr; | |
207 | resp->nlanes = fip->nlanes; | |
208 | ||
209 | return 0; | |
210 | err_port: | |
211 | err_fi_getname: | |
212 | return -1; | |
213 | } | |
214 | ||
215 | /* | |
216 | * rpmemd_fip_init_fabric_res -- initialize common fabric's resources | |
217 | */ | |
218 | static int | |
219 | rpmemd_fip_init_fabric_res(struct rpmemd_fip *fip) | |
220 | { | |
221 | int ret; | |
222 | ret = fi_fabric(fip->fi->fabric_attr, &fip->fabric, NULL); | |
223 | if (ret) { | |
224 | RPMEMD_FI_ERR(ret, "opening fabric domain"); | |
225 | goto err_fi_fabric; | |
226 | } | |
227 | ||
228 | ret = fi_domain(fip->fabric, fip->fi, &fip->domain, NULL); | |
229 | if (ret) { | |
230 | RPMEMD_FI_ERR(ret, "opening fabric access domain"); | |
231 | goto err_fi_domain; | |
232 | } | |
233 | ||
234 | struct fi_eq_attr eq_attr = { | |
235 | .size = 0, /* use default */ | |
236 | .flags = 0, | |
237 | .wait_obj = FI_WAIT_UNSPEC, | |
238 | .signaling_vector = 0, | |
239 | .wait_set = NULL, | |
240 | }; | |
241 | ||
242 | ret = fi_eq_open(fip->fabric, &eq_attr, &fip->eq, NULL); | |
243 | if (ret) { | |
244 | RPMEMD_FI_ERR(ret, "opening event queue"); | |
245 | goto err_eq_open; | |
246 | } | |
247 | ||
248 | ret = fi_passive_ep(fip->fabric, fip->fi, &fip->pep, NULL); | |
249 | if (ret) { | |
250 | RPMEMD_FI_ERR(ret, "allocating passive endpoint"); | |
251 | goto err_pep; | |
252 | } | |
253 | ||
254 | ret = fi_pep_bind(fip->pep, &fip->eq->fid, 0); | |
255 | if (ret) { | |
256 | RPMEMD_FI_ERR(ret, "binding event queue to passive endpoint"); | |
257 | goto err_pep_bind_eq; | |
258 | } | |
259 | ||
260 | return 0; | |
261 | err_pep_bind_eq: | |
262 | RPMEMD_FI_CLOSE(fip->pep, "closing passive endpoint"); | |
263 | err_pep: | |
264 | RPMEMD_FI_CLOSE(fip->eq, "closing event queue"); | |
265 | err_eq_open: | |
266 | RPMEMD_FI_CLOSE(fip->domain, "closing fabric access domain"); | |
267 | err_fi_domain: | |
268 | RPMEMD_FI_CLOSE(fip->fabric, "closing fabric domain"); | |
269 | err_fi_fabric: | |
270 | return ret; | |
271 | } | |
272 | ||
273 | /* | |
274 | * rpmemd_fip_fini_fabric_res -- deinitialize common fabric resources | |
275 | */ | |
276 | static void | |
277 | rpmemd_fip_fini_fabric_res(struct rpmemd_fip *fip) | |
278 | { | |
279 | RPMEMD_FI_CLOSE(fip->pep, "closing passive endpoint"); | |
280 | RPMEMD_FI_CLOSE(fip->eq, "closing event queue"); | |
281 | RPMEMD_FI_CLOSE(fip->domain, "closing fabric access domain"); | |
282 | RPMEMD_FI_CLOSE(fip->fabric, "closing fabric domain"); | |
283 | } | |
284 | ||
285 | /* | |
286 | * rpmemd_fip_init_memory -- initialize memory pool's resources | |
287 | */ | |
288 | static int | |
289 | rpmemd_fip_init_memory(struct rpmemd_fip *fip) | |
290 | { | |
291 | int ret; | |
292 | ||
293 | /* | |
294 | * Register memory region with appropriate access bits: | |
295 | * - FI_REMOTE_READ - remote peer can issue READ operation, | |
296 | * - FI_REMOTE_WRITE - remote peer can issue WRITE operation, | |
297 | */ | |
298 | ret = fi_mr_reg(fip->domain, fip->addr, fip->size, | |
299 | FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 0, 0, | |
300 | &fip->mr, NULL); | |
301 | if (ret) { | |
302 | RPMEMD_FI_ERR(ret, "registering memory"); | |
303 | return -1; | |
304 | } | |
305 | ||
306 | return 0; | |
307 | } | |
308 | ||
309 | /* | |
310 | * rpmemd_fip_fini_memory -- deinitialize memory pool's resources | |
311 | */ | |
312 | static void | |
313 | rpmemd_fip_fini_memory(struct rpmemd_fip *fip) | |
314 | { | |
315 | RPMEMD_FI_CLOSE(fip->mr, "unregistering memory"); | |
316 | } | |
317 | ||
318 | /* | |
319 | * rpmemd_fip_init_ep -- initialize active endpoint | |
320 | */ | |
321 | static int | |
322 | rpmemd_fip_init_ep(struct rpmemd_fip *fip, struct fi_info *info, | |
323 | struct rpmem_fip_lane *lanep) | |
324 | { | |
325 | int ret; | |
326 | ||
327 | info->tx_attr->size = rpmem_fip_wq_size(fip->persist_method, | |
328 | RPMEM_FIP_NODE_SERVER); | |
329 | ||
330 | info->rx_attr->size = rpmem_fip_rx_size(fip->persist_method, | |
331 | RPMEM_FIP_NODE_SERVER); | |
332 | ||
333 | /* create an endpoint from fabric interface info */ | |
334 | ret = fi_endpoint(fip->domain, info, &lanep->ep, NULL); | |
335 | if (ret) { | |
336 | RPMEMD_FI_ERR(ret, "allocating endpoint"); | |
337 | goto err_endpoint; | |
338 | } | |
339 | ||
340 | /* bind event queue to the endpoint */ | |
341 | ret = fi_ep_bind(lanep->ep, &fip->eq->fid, 0); | |
342 | if (ret) { | |
343 | RPMEMD_FI_ERR(ret, "binding event queue to endpoint"); | |
344 | goto err_bind_eq; | |
345 | } | |
346 | ||
347 | /* | |
348 | * Bind completion queue to the endpoint. | |
349 | * Use a single completion queue for outbound and inbound work | |
350 | * requests. Use selective completion implies adding FI_COMPLETE | |
351 | * flag to each WR which needs a completion. | |
352 | */ | |
353 | ret = fi_ep_bind(lanep->ep, &lanep->cq->fid, | |
354 | FI_RECV | FI_TRANSMIT | FI_SELECTIVE_COMPLETION); | |
355 | if (ret) { | |
356 | RPMEMD_FI_ERR(ret, "binding completion queue to endpoint"); | |
357 | goto err_bind_cq; | |
358 | } | |
359 | ||
360 | /* enable the endpoint */ | |
361 | ret = fi_enable(lanep->ep); | |
362 | if (ret) { | |
363 | RPMEMD_FI_ERR(ret, "enabling endpoint"); | |
364 | goto err_enable; | |
365 | } | |
366 | ||
367 | return 0; | |
368 | err_enable: | |
369 | err_bind_cq: | |
370 | err_bind_eq: | |
371 | RPMEMD_FI_CLOSE(lanep->ep, "closing endpoint"); | |
372 | err_endpoint: | |
373 | return -1; | |
374 | } | |
375 | ||
376 | /* | |
377 | * rpmemd_fip_fini_ep -- close endpoint | |
378 | */ | |
379 | static int | |
380 | rpmemd_fip_fini_ep(struct rpmem_fip_lane *lanep) | |
381 | { | |
382 | return RPMEMD_FI_CLOSE(lanep->ep, "closing endpoint"); | |
383 | } | |
384 | ||
385 | /* | |
386 | * rpmemd_fip_post_msg -- post RECV buffer | |
387 | */ | |
388 | static inline int | |
389 | rpmemd_fip_post_msg(struct rpmemd_fip_lane *lanep) | |
390 | { | |
391 | int ret = rpmem_fip_recvmsg(lanep->base.ep, &lanep->recv); | |
392 | if (ret) { | |
393 | RPMEMD_FI_ERR(ret, "posting recv buffer"); | |
394 | return ret; | |
395 | } | |
396 | ||
397 | lanep->recv_posted = 1; | |
398 | ||
399 | return 0; | |
400 | } | |
401 | ||
402 | /* | |
403 | * rpmemd_fip_post_resp -- post SEND buffer | |
404 | */ | |
405 | static inline int | |
406 | rpmemd_fip_post_resp(struct rpmemd_fip_lane *lanep) | |
407 | { | |
408 | int ret = rpmem_fip_sendmsg(lanep->base.ep, &lanep->send, | |
409 | sizeof(struct rpmem_msg_persist_resp)); | |
410 | if (ret) { | |
411 | RPMEMD_FI_ERR(ret, "posting send buffer"); | |
412 | return ret; | |
413 | } | |
414 | ||
415 | lanep->send_posted = 1; | |
416 | ||
417 | return 0; | |
418 | } | |
419 | ||
420 | /* | |
421 | * rpmemd_fip_post_common -- post all RECV messages | |
422 | */ | |
423 | static int | |
424 | rpmemd_fip_post_common(struct rpmemd_fip *fip, struct rpmemd_fip_lane *lanep) | |
425 | { | |
426 | int ret = rpmem_fip_recvmsg(lanep->base.ep, &lanep->recv); | |
427 | if (ret) { | |
428 | RPMEMD_FI_ERR(ret, "posting recv buffer"); | |
429 | return ret; | |
430 | } | |
431 | ||
432 | lanep->recv_posted = 1; | |
433 | ||
434 | return 0; | |
435 | } | |
436 | ||
437 | /* | |
438 | * rpmemd_fip_lanes_init -- initialize all lanes | |
439 | */ | |
440 | static int | |
441 | rpmemd_fip_lanes_init(struct rpmemd_fip *fip) | |
442 | { | |
443 | ||
444 | fip->lanes = calloc(fip->nlanes, sizeof(*fip->lanes)); | |
445 | if (!fip->lanes) { | |
446 | RPMEMD_ERR("!allocating lanes"); | |
447 | goto err_alloc; | |
448 | } | |
449 | ||
450 | return 0; | |
451 | err_alloc: | |
452 | return -1; | |
453 | } | |
454 | ||
455 | /* | |
456 | * rpmemd_fip_fini_lanes -- deinitialize all lanes | |
457 | */ | |
458 | static void | |
459 | rpmemd_fip_fini_lanes(struct rpmemd_fip *fip) | |
460 | { | |
461 | free(fip->lanes); | |
462 | } | |
463 | ||
464 | /* | |
465 | * rpmemd_fip_init_common -- initialize common resources | |
466 | */ | |
467 | static int | |
468 | rpmemd_fip_init_common(struct rpmemd_fip *fip) | |
469 | { | |
470 | int ret; | |
471 | ||
472 | /* allocate persist message buffer */ | |
473 | size_t msg_size = fip->nlanes * fip->pmsg_size; | |
474 | fip->pmsg = malloc(msg_size); | |
475 | if (!fip->pmsg) { | |
476 | RPMEMD_LOG(ERR, "!allocating messages buffer"); | |
477 | goto err_msg_malloc; | |
478 | } | |
479 | ||
480 | /* register persist message buffer */ | |
481 | ret = fi_mr_reg(fip->domain, fip->pmsg, msg_size, FI_RECV, | |
482 | 0, 0, 0, &fip->pmsg_mr, NULL); | |
483 | if (ret) { | |
484 | RPMEMD_FI_ERR(ret, "registering messages buffer"); | |
485 | goto err_mr_reg_msg; | |
486 | } | |
487 | ||
488 | /* get persist message buffer's local descriptor */ | |
489 | fip->pmsg_mr_desc = fi_mr_desc(fip->pmsg_mr); | |
490 | ||
491 | /* allocate persist response message buffer */ | |
492 | size_t msg_resp_size = fip->nlanes * | |
493 | sizeof(struct rpmem_msg_persist_resp); | |
494 | fip->pres = malloc(msg_resp_size); | |
495 | if (!fip->pres) { | |
496 | RPMEMD_FI_ERR(ret, "allocating messages response buffer"); | |
497 | goto err_msg_resp_malloc; | |
498 | } | |
499 | ||
500 | /* register persist response message buffer */ | |
501 | ret = fi_mr_reg(fip->domain, fip->pres, msg_resp_size, FI_SEND, | |
502 | 0, 0, 0, &fip->pres_mr, NULL); | |
503 | if (ret) { | |
504 | RPMEMD_FI_ERR(ret, "registering messages " | |
505 | "response buffer"); | |
506 | goto err_mr_reg_msg_resp; | |
507 | } | |
508 | ||
509 | /* get persist message buffer's local descriptor */ | |
510 | fip->pres_mr_desc = fi_mr_desc(fip->pres_mr); | |
511 | ||
512 | /* initialize lanes */ | |
513 | unsigned i; | |
514 | for (i = 0; i < fip->nlanes; i++) { | |
515 | struct rpmemd_fip_lane *lanep = &fip->lanes[i]; | |
516 | ||
517 | /* initialize RECV message */ | |
518 | rpmem_fip_msg_init(&lanep->recv, | |
519 | fip->pmsg_mr_desc, 0, | |
520 | lanep, | |
521 | rpmemd_fip_get_pmsg(fip, i), | |
522 | fip->pmsg_size, | |
523 | FI_COMPLETION); | |
524 | ||
525 | /* initialize SEND message */ | |
526 | rpmem_fip_msg_init(&lanep->send, | |
527 | fip->pres_mr_desc, 0, | |
528 | lanep, | |
529 | &fip->pres[i], | |
530 | sizeof(fip->pres[i]), | |
531 | FI_COMPLETION); | |
532 | } | |
533 | ||
534 | return 0; | |
535 | err_mr_reg_msg_resp: | |
536 | free(fip->pres); | |
537 | err_msg_resp_malloc: | |
538 | RPMEMD_FI_CLOSE(fip->pmsg_mr, | |
539 | "unregistering messages buffer"); | |
540 | err_mr_reg_msg: | |
541 | free(fip->pmsg); | |
542 | err_msg_malloc: | |
543 | return -1; | |
544 | } | |
545 | ||
546 | /* | |
547 | * rpmemd_fip_fini_common -- deinitialize common resources and return last | |
548 | * error code | |
549 | */ | |
550 | static int | |
551 | rpmemd_fip_fini_common(struct rpmemd_fip *fip) | |
552 | { | |
553 | int lret = 0; | |
554 | int ret; | |
555 | ||
556 | ret = RPMEMD_FI_CLOSE(fip->pmsg_mr, | |
557 | "unregistering messages buffer"); | |
558 | if (ret) | |
559 | lret = ret; | |
560 | ||
561 | ret = RPMEMD_FI_CLOSE(fip->pres_mr, | |
562 | "unregistering messages response buffer"); | |
563 | if (ret) | |
564 | lret = ret; | |
565 | ||
566 | free(fip->pmsg); | |
567 | free(fip->pres); | |
568 | ||
569 | return lret; | |
570 | } | |
571 | ||
572 | /* | |
573 | * rpmemd_fip_check_pmsg -- verify persist message | |
574 | */ | |
575 | static inline int | |
576 | rpmemd_fip_check_pmsg(struct rpmemd_fip *fip, struct rpmem_msg_persist *pmsg) | |
577 | { | |
578 | if (pmsg->lane >= fip->nlanes) { | |
579 | RPMEMD_LOG(ERR, "invalid lane number -- %u", pmsg->lane); | |
580 | return -1; | |
581 | } | |
582 | ||
583 | uintptr_t raddr = pmsg->addr; | |
584 | uintptr_t laddr = (uintptr_t)fip->addr; | |
585 | ||
586 | if (raddr < laddr || raddr + pmsg->size > laddr + fip->size) { | |
587 | RPMEMD_LOG(ERR, "invalid address or size requested " | |
588 | "for persist operation (0x%lx, %lu)", | |
589 | raddr, pmsg->size); | |
590 | return -1; | |
591 | } | |
592 | ||
593 | return 0; | |
594 | } | |
595 | ||
596 | /* | |
597 | * rpmemd_fip_process_send -- process FI_SEND completion | |
598 | */ | |
599 | static int | |
600 | rpmemd_fip_process_send(struct rpmemd_fip *fip, struct rpmemd_fip_lane *lanep) | |
601 | { | |
602 | lanep->send_posted = 0; | |
603 | ||
604 | if (lanep->recv_posted) | |
605 | return 0; | |
606 | ||
607 | struct rpmem_msg_persist_resp *pres = | |
608 | rpmem_fip_msg_get_pres(&lanep->send); | |
609 | ||
610 | *pres = lanep->resp; | |
611 | ||
612 | int ret; | |
613 | ||
614 | /* post lane's RECV buffer */ | |
615 | ret = rpmemd_fip_post_msg(lanep); | |
616 | if (unlikely(ret)) | |
617 | goto err; | |
618 | ||
619 | /* post lane's SEND buffer */ | |
620 | ret = rpmemd_fip_post_resp(lanep); | |
621 | err: | |
622 | return ret; | |
623 | } | |
624 | ||
625 | /* | |
626 | * rpmemd_fip_process_recv -- process FI_RECV completion | |
627 | */ | |
628 | static int | |
629 | rpmemd_fip_process_recv(struct rpmemd_fip *fip, struct rpmemd_fip_lane *lanep) | |
630 | { | |
631 | int ret = 0; | |
632 | ||
633 | lanep->recv_posted = 0; | |
634 | ||
635 | /* | |
636 | * Get persist message and persist message response from appropriate | |
637 | * buffers. The persist message is in lane's RECV buffer and the | |
638 | * persist response message in lane's SEND buffer. | |
639 | */ | |
640 | struct rpmem_msg_persist *pmsg = rpmem_fip_msg_get_pmsg(&lanep->recv); | |
641 | VALGRIND_DO_MAKE_MEM_DEFINED(pmsg, sizeof(*pmsg)); | |
642 | ||
643 | /* verify persist message */ | |
644 | ret = rpmemd_fip_check_pmsg(fip, pmsg); | |
645 | if (unlikely(ret)) | |
646 | goto err; | |
647 | unsigned mode = pmsg->flags & RPMEM_FLUSH_PERSIST_MASK; | |
648 | ||
649 | if (mode == RPMEM_DEEP_PERSIST) { | |
650 | fip->deep_persist((void *)pmsg->addr, pmsg->size, fip->ctx); | |
651 | } else if (mode == RPMEM_PERSIST_SEND) { | |
652 | fip->memcpy_persist((void *)pmsg->addr, pmsg->data, pmsg->size); | |
653 | } else { | |
654 | fip->persist((void *)pmsg->addr, pmsg->size); | |
655 | } | |
656 | ||
657 | struct rpmem_msg_persist_resp *pres = lanep->send_posted ? | |
658 | &lanep->resp : rpmem_fip_msg_get_pres(&lanep->send); | |
659 | ||
660 | /* return back the lane id */ | |
661 | pres->lane = pmsg->lane; | |
662 | ||
663 | if (!lanep->send_posted) { | |
664 | /* post lane's RECV buffer */ | |
665 | ret = rpmemd_fip_post_msg(lanep); | |
666 | if (unlikely(ret)) | |
667 | goto err; | |
668 | ||
669 | /* post lane's SEND buffer */ | |
670 | ret = rpmemd_fip_post_resp(lanep); | |
671 | } | |
672 | ||
673 | err: | |
674 | return ret; | |
675 | } | |
676 | ||
677 | /* | |
678 | * rpmemd_fip_cq_read -- wait for specific events on completion queue | |
679 | */ | |
680 | static int | |
681 | rpmemd_fip_cq_read(struct rpmemd_fip *fip, struct fid_cq *cq, | |
682 | struct rpmemd_fip_lane **lanep, uint64_t *event, uint64_t event_mask) | |
683 | { | |
684 | struct fi_cq_err_entry err; | |
685 | struct fi_cq_msg_entry cq_entry; | |
686 | const char *str_err; | |
687 | ssize_t sret; | |
688 | int ret; | |
689 | ||
690 | while (!fip->closing) { | |
691 | sret = fi_cq_sread(cq, &cq_entry, 1, NULL, | |
692 | RPMEM_FIP_CQ_WAIT_MS); | |
693 | ||
694 | if (unlikely(fip->closing)) | |
695 | break; | |
696 | ||
697 | if (unlikely(sret == -FI_EAGAIN || sret == 0)) | |
698 | continue; | |
699 | ||
700 | if (unlikely(sret < 0)) { | |
701 | ret = (int)sret; | |
702 | goto err_cq_read; | |
703 | } | |
704 | ||
705 | if (!(cq_entry.flags & event_mask)) { | |
706 | RPMEMD_LOG(ERR, "unexpected event received %lx", | |
707 | cq_entry.flags); | |
708 | ret = -1; | |
709 | goto err; | |
710 | } | |
711 | ||
712 | if (!cq_entry.op_context) { | |
713 | RPMEMD_LOG(ERR, "null context received"); | |
714 | ret = -1; | |
715 | goto err; | |
716 | } | |
717 | ||
718 | *event = cq_entry.flags & event_mask; | |
719 | *lanep = cq_entry.op_context; | |
720 | ||
721 | return 0; | |
722 | } | |
723 | ||
724 | return 0; | |
725 | err_cq_read: | |
726 | sret = fi_cq_readerr(cq, &err, 0); | |
727 | if (sret < 0) { | |
728 | RPMEMD_FI_ERR((int)sret, "error reading from completion queue: " | |
729 | "cannot read error from completion queue"); | |
730 | goto err; | |
731 | } | |
732 | ||
733 | str_err = fi_cq_strerror(cq, err.prov_errno, NULL, NULL, 0); | |
734 | RPMEMD_LOG(ERR, "error reading from completion queue: %s", str_err); | |
735 | err: | |
736 | return ret; | |
737 | } | |
738 | ||
739 | /* | |
740 | * rpmemd_fip_thread -- thread callback which processes persist | |
741 | * operation | |
742 | */ | |
743 | static void * | |
744 | rpmemd_fip_thread(void *arg) | |
745 | { | |
746 | struct rpmemd_fip_thread *thread = arg; | |
747 | struct rpmemd_fip *fip = thread->fip; | |
748 | struct rpmemd_fip_lane *lanep = NULL; | |
749 | uint64_t event = 0; | |
750 | int ret = 0; | |
751 | ||
752 | while (!fip->closing) { | |
753 | ret = rpmemd_fip_cq_read(fip, thread->cq, &lanep, &event, | |
754 | FI_SEND|FI_RECV); | |
755 | if (ret) | |
756 | goto err; | |
757 | ||
758 | if (unlikely(fip->closing)) | |
759 | break; | |
760 | ||
761 | RPMEMD_ASSERT(lanep != NULL); | |
762 | if (event & FI_RECV) | |
763 | ret = rpmemd_fip_process_recv(fip, lanep); | |
764 | else if (event & FI_SEND) | |
765 | ret = rpmemd_fip_process_send(fip, lanep); | |
766 | if (ret) | |
767 | goto err; | |
768 | } | |
769 | ||
770 | return 0; | |
771 | err: | |
772 | return (void *)(uintptr_t)ret; | |
773 | } | |
774 | ||
775 | /* | |
776 | * rpmemd_fip_get_def_nthreads -- get default number of threads for given | |
777 | * persistency method | |
778 | */ | |
779 | static size_t | |
780 | rpmemd_fip_get_def_nthreads(struct rpmemd_fip *fip) | |
781 | { | |
782 | RPMEMD_ASSERT(fip->nlanes > 0); | |
783 | switch (fip->persist_method) { | |
784 | case RPMEM_PM_APM: | |
785 | case RPMEM_PM_GPSPM: | |
786 | return fip->nlanes; | |
787 | default: | |
788 | RPMEMD_ASSERT(0); | |
789 | return 0; | |
790 | } | |
791 | } | |
792 | ||
793 | /* | |
794 | * rpmemd_fip_set_attr -- save required attributes in rpmemd_fip handle | |
795 | */ | |
796 | static void | |
797 | rpmemd_fip_set_attr(struct rpmemd_fip *fip, struct rpmemd_fip_attr *attr) | |
798 | { | |
799 | fip->addr = attr->addr; | |
800 | fip->size = attr->size; | |
801 | fip->persist_method = attr->persist_method; | |
802 | fip->persist = attr->persist; | |
803 | fip->memcpy_persist = attr->memcpy_persist; | |
804 | fip->deep_persist = attr->deep_persist; | |
805 | fip->ctx = attr->ctx; | |
806 | fip->buff_size = attr->buff_size; | |
807 | fip->pmsg_size = roundup(sizeof(struct rpmem_msg_persist) + | |
808 | fip->buff_size, (size_t)64); | |
809 | ||
810 | size_t max_nlanes = rpmem_fip_max_nlanes(fip->fi); | |
811 | RPMEMD_ASSERT(max_nlanes < UINT_MAX); | |
812 | fip->nlanes = min((unsigned)max_nlanes, attr->nlanes); | |
813 | ||
814 | if (attr->nthreads) { | |
815 | fip->nthreads = attr->nthreads; | |
816 | } else { | |
817 | /* use default */ | |
818 | fip->nthreads = rpmemd_fip_get_def_nthreads(fip); | |
819 | } | |
820 | ||
821 | fip->lanes_per_thread = (fip->nlanes - 1) / fip->nthreads + 1; | |
822 | size_t cq_size_per_lane = rpmem_fip_cq_size(fip->persist_method, | |
823 | RPMEM_FIP_NODE_SERVER); | |
824 | ||
825 | fip->cq_size = fip->lanes_per_thread * cq_size_per_lane; | |
826 | ||
827 | RPMEMD_ASSERT(fip->persist_method < MAX_RPMEM_PM); | |
828 | } | |
829 | ||
830 | /* | |
831 | * rpmemd_fip_init_thread -- init worker thread | |
832 | */ | |
833 | static int | |
834 | rpmemd_fip_init_thread(struct rpmemd_fip *fip, struct rpmemd_fip_thread *thread) | |
835 | { | |
836 | thread->fip = fip; | |
837 | thread->lanes = malloc(fip->lanes_per_thread * sizeof(*thread->lanes)); | |
838 | if (!thread->lanes) { | |
839 | RPMEMD_LOG(ERR, "!allocating thread lanes"); | |
840 | goto err_alloc_lanes; | |
841 | } | |
842 | ||
843 | struct fi_cq_attr cq_attr = { | |
844 | .size = fip->cq_size, | |
845 | .flags = 0, | |
846 | .format = FI_CQ_FORMAT_MSG, /* need context and flags */ | |
847 | .wait_obj = FI_WAIT_UNSPEC, | |
848 | .signaling_vector = 0, | |
849 | .wait_cond = FI_CQ_COND_NONE, | |
850 | .wait_set = NULL, | |
851 | }; | |
852 | ||
853 | int ret = fi_cq_open(fip->domain, &cq_attr, &thread->cq, NULL); | |
854 | if (ret) { | |
855 | RPMEMD_FI_ERR(ret, "opening completion queue"); | |
856 | goto err_cq_open; | |
857 | } | |
858 | ||
859 | return 0; | |
860 | err_cq_open: | |
861 | free(thread->lanes); | |
862 | err_alloc_lanes: | |
863 | return -1; | |
864 | } | |
865 | ||
866 | /* | |
867 | * rpmemd_fip_fini_thread -- deinitialize worker thread | |
868 | */ | |
869 | static void | |
870 | rpmemd_fip_fini_thread(struct rpmemd_fip *fip, struct rpmemd_fip_thread *thread) | |
871 | { | |
872 | RPMEMD_FI_CLOSE(thread->cq, "closing completion queue"); | |
873 | free(thread->lanes); | |
874 | } | |
875 | ||
876 | /* | |
877 | * rpmemd_fip_init_threads -- initialize worker threads | |
878 | */ | |
879 | static int | |
880 | rpmemd_fip_init_threads(struct rpmemd_fip *fip) | |
881 | { | |
882 | RPMEMD_ASSERT(fip->lanes != NULL); | |
883 | RPMEMD_ASSERT(fip->nthreads > 0); | |
884 | ||
885 | fip->threads = calloc(fip->nthreads, sizeof(*fip->threads)); | |
886 | if (!fip->threads) { | |
887 | RPMEMD_LOG(ERR, "!allocating threads"); | |
888 | goto err_alloc_threads; | |
889 | } | |
890 | ||
891 | int ret; | |
892 | size_t i; | |
893 | for (i = 0; i < fip->nthreads; i++) { | |
894 | ret = rpmemd_fip_init_thread(fip, &fip->threads[i]); | |
895 | if (ret) { | |
896 | RPMEMD_LOG(ERR, "!initializing thread %zu", i); | |
897 | goto err_init_thread; | |
898 | } | |
899 | } | |
900 | ||
901 | for (size_t i = 0; i < fip->nlanes; i++) { | |
902 | size_t w = i % fip->nthreads; | |
903 | struct rpmemd_fip_thread *thread = &fip->threads[w]; | |
904 | fip->lanes[i].base.cq = thread->cq; | |
905 | thread->lanes[thread->nlanes++] = &fip->lanes[i]; | |
906 | } | |
907 | ||
908 | return 0; | |
909 | err_init_thread: | |
910 | for (size_t j = 0; j < i; j++) | |
911 | rpmemd_fip_fini_thread(fip, &fip->threads[j]); | |
912 | free(fip->threads); | |
913 | err_alloc_threads: | |
914 | return -1; | |
915 | } | |
916 | ||
917 | /* | |
918 | * rpmemd_fip_fini_threads -- deinitialize worker threads | |
919 | */ | |
920 | static void | |
921 | rpmemd_fip_fini_threads(struct rpmemd_fip *fip) | |
922 | { | |
923 | for (size_t i = 0; i < fip->nthreads; i++) | |
924 | rpmemd_fip_fini_thread(fip, &fip->threads[i]); | |
925 | free(fip->threads); | |
926 | } | |
927 | ||
928 | /* | |
929 | * rpmemd_fip_init -- initialize fabric provider | |
930 | */ | |
931 | struct rpmemd_fip * | |
932 | rpmemd_fip_init(const char *node, const char *service, | |
933 | struct rpmemd_fip_attr *attr, struct rpmem_resp_attr *resp, | |
934 | enum rpmem_err *err) | |
935 | { | |
936 | int ret; | |
937 | ||
938 | RPMEMD_ASSERT(resp); | |
939 | RPMEMD_ASSERT(err); | |
940 | RPMEMD_ASSERT(attr); | |
941 | RPMEMD_ASSERT(attr->persist); | |
942 | ||
943 | struct rpmemd_fip *fip = calloc(1, sizeof(*fip)); | |
944 | if (!fip) { | |
945 | RPMEMD_LOG(ERR, "!allocating fabric handle"); | |
946 | *err = RPMEM_ERR_FATAL; | |
947 | return NULL; | |
948 | } | |
949 | ||
950 | ret = rpmemd_fip_getinfo(fip, service, node, attr->provider); | |
951 | if (ret) { | |
952 | *err = RPMEM_ERR_BADPROVIDER; | |
953 | goto err_getinfo; | |
954 | } | |
955 | ||
956 | rpmemd_fip_set_attr(fip, attr); | |
957 | ||
958 | ret = rpmemd_fip_init_fabric_res(fip); | |
959 | if (ret) { | |
960 | *err = RPMEM_ERR_FATAL; | |
961 | goto err_init_fabric_res; | |
962 | } | |
963 | ||
964 | ret = rpmemd_fip_init_memory(fip); | |
965 | if (ret) { | |
966 | *err = RPMEM_ERR_FATAL; | |
967 | goto err_init_memory; | |
968 | } | |
969 | ||
970 | ret = rpmemd_fip_lanes_init(fip); | |
971 | if (ret) { | |
972 | *err = RPMEM_ERR_FATAL; | |
973 | goto err_init_lanes; | |
974 | } | |
975 | ||
976 | ret = rpmemd_fip_init_threads(fip); | |
977 | if (ret) { | |
978 | *err = RPMEM_ERR_FATAL; | |
979 | goto err_init_threads; | |
980 | } | |
981 | ||
982 | ret = rpmemd_fip_init_common(fip); | |
983 | if (ret) { | |
984 | *err = RPMEM_ERR_FATAL; | |
985 | goto err_init; | |
986 | } | |
987 | ||
988 | ret = fi_listen(fip->pep); | |
989 | if (ret) { | |
990 | *err = RPMEM_ERR_FATAL_CONN; | |
991 | goto err_fi_listen; | |
992 | } | |
993 | ||
994 | ret = rpmemd_fip_set_resp(fip, resp); | |
995 | if (ret) { | |
996 | *err = RPMEM_ERR_FATAL; | |
997 | goto err_set_resp; | |
998 | } | |
999 | ||
1000 | return fip; | |
1001 | err_set_resp: | |
1002 | RPMEMD_FI_CLOSE(fip->pep, "closing passive endpoint"); | |
1003 | err_fi_listen: | |
1004 | rpmemd_fip_fini_common(fip); | |
1005 | err_init: | |
1006 | rpmemd_fip_fini_threads(fip); | |
1007 | err_init_threads: | |
1008 | rpmemd_fip_fini_lanes(fip); | |
1009 | err_init_lanes: | |
1010 | rpmemd_fip_fini_memory(fip); | |
1011 | err_init_memory: | |
1012 | rpmemd_fip_fini_fabric_res(fip); | |
1013 | err_init_fabric_res: | |
1014 | fi_freeinfo(fip->fi); | |
1015 | err_getinfo: | |
1016 | free(fip); | |
1017 | return NULL; | |
1018 | } | |
1019 | ||
1020 | /* | |
1021 | * rpmemd_fip_fini -- deinitialize fabric provider | |
1022 | */ | |
1023 | void | |
1024 | rpmemd_fip_fini(struct rpmemd_fip *fip) | |
1025 | { | |
1026 | rpmemd_fip_fini_common(fip); | |
1027 | rpmemd_fip_fini_threads(fip); | |
1028 | rpmemd_fip_fini_lanes(fip); | |
1029 | rpmemd_fip_fini_memory(fip); | |
1030 | rpmemd_fip_fini_fabric_res(fip); | |
1031 | fi_freeinfo(fip->fi); | |
1032 | free(fip); | |
1033 | } | |
1034 | ||
1035 | /* | |
1036 | * rpmemd_fip_accept_one -- accept a single connection | |
1037 | */ | |
1038 | static int | |
1039 | rpmemd_fip_accept_one(struct rpmemd_fip *fip, | |
1040 | struct fi_info *info, struct rpmemd_fip_lane *lanep) | |
1041 | { | |
1042 | int ret; | |
1043 | ||
1044 | ret = rpmemd_fip_init_ep(fip, info, &lanep->base); | |
1045 | if (ret) | |
1046 | goto err_init_ep; | |
1047 | ||
1048 | ret = rpmemd_fip_post_common(fip, lanep); | |
1049 | if (ret) | |
1050 | goto err_post; | |
1051 | ||
1052 | ret = fi_accept(lanep->base.ep, NULL, 0); | |
1053 | if (ret) { | |
1054 | RPMEMD_FI_ERR(ret, "accepting connection request"); | |
1055 | goto err_accept; | |
1056 | } | |
1057 | ||
1058 | fi_freeinfo(info); | |
1059 | ||
1060 | return 0; | |
1061 | err_accept: | |
1062 | err_post: | |
1063 | rpmemd_fip_fini_ep(&lanep->base); | |
1064 | err_init_ep: | |
1065 | fi_freeinfo(info); | |
1066 | return -1; | |
1067 | } | |
1068 | ||
1069 | /* | |
1070 | * rpmemd_fip_accept -- accept a single connection request | |
1071 | */ | |
1072 | int | |
1073 | rpmemd_fip_accept(struct rpmemd_fip *fip, int timeout) | |
1074 | { | |
1075 | int ret; | |
1076 | struct fi_eq_cm_entry entry; | |
1077 | uint32_t event; | |
1078 | unsigned nreq = 0; /* number of connection requests */ | |
1079 | unsigned ncon = 0; /* number of connected endpoints */ | |
1080 | int connecting = 1; | |
1081 | ||
1082 | while (connecting && (nreq < fip->nlanes || ncon < fip->nlanes)) { | |
1083 | ret = rpmem_fip_read_eq(fip->eq, &entry, | |
1084 | &event, timeout); | |
1085 | if (ret) | |
1086 | goto err_read_eq; | |
1087 | ||
1088 | switch (event) { | |
1089 | case FI_CONNREQ: | |
1090 | ret = rpmemd_fip_accept_one(fip, entry.info, | |
1091 | &fip->lanes[nreq]); | |
1092 | if (ret) | |
1093 | goto err_accept_one; | |
1094 | nreq++; | |
1095 | break; | |
1096 | case FI_CONNECTED: | |
1097 | ncon++; | |
1098 | break; | |
1099 | case FI_SHUTDOWN: | |
1100 | connecting = 0; | |
1101 | break; | |
1102 | default: | |
1103 | RPMEMD_ERR("unexpected event received (%u)", event); | |
1104 | goto err_read_eq; | |
1105 | ||
1106 | } | |
1107 | } | |
1108 | ||
1109 | return 0; | |
1110 | err_accept_one: | |
1111 | err_read_eq: | |
1112 | return -1; | |
1113 | } | |
1114 | ||
1115 | /* | |
1116 | * rpmemd_fip_wait_close -- wait specified time for connection closed event | |
1117 | */ | |
1118 | int | |
1119 | rpmemd_fip_wait_close(struct rpmemd_fip *fip, int timeout) | |
1120 | { | |
1121 | struct fi_eq_cm_entry entry; | |
1122 | int lret = 0; | |
1123 | uint32_t event; | |
1124 | int ret; | |
1125 | ||
1126 | for (unsigned i = 0; i < fip->nlanes; i++) { | |
1127 | ret = rpmem_fip_read_eq(fip->eq, &entry, &event, timeout); | |
1128 | if (ret) | |
1129 | lret = ret; | |
1130 | if (event != FI_SHUTDOWN) { | |
1131 | RPMEMD_ERR("unexpected event received " | |
1132 | "(is %u expected %u)", | |
1133 | event, FI_SHUTDOWN); | |
1134 | errno = EINVAL; | |
1135 | lret = -1; | |
1136 | } | |
1137 | } | |
1138 | ||
1139 | return lret; | |
1140 | } | |
1141 | ||
1142 | /* | |
1143 | * rpmemd_fip_close -- close the connection | |
1144 | */ | |
1145 | int | |
1146 | rpmemd_fip_close(struct rpmemd_fip *fip) | |
1147 | { | |
1148 | int lret = 0; | |
1149 | int ret; | |
1150 | ||
1151 | for (unsigned i = 0; i < fip->nlanes; i++) { | |
1152 | ret = rpmemd_fip_fini_ep(&fip->lanes[i].base); | |
1153 | if (ret) | |
1154 | lret = ret; | |
1155 | } | |
1156 | ||
1157 | return lret; | |
1158 | } | |
1159 | ||
1160 | /* | |
1161 | * rpmemd_fip_process_start -- start processing | |
1162 | */ | |
1163 | int | |
1164 | rpmemd_fip_process_start(struct rpmemd_fip *fip) | |
1165 | { | |
1166 | unsigned i; | |
1167 | for (i = 0; i < fip->nthreads; i++) { | |
1168 | errno = os_thread_create(&fip->threads[i].thread, NULL, | |
1169 | rpmemd_fip_thread, &fip->threads[i]); | |
1170 | if (errno) { | |
1171 | RPMEMD_ERR("!running thread thread"); | |
1172 | goto err_thread_create; | |
1173 | } | |
1174 | } | |
1175 | ||
1176 | return 0; | |
1177 | err_thread_create: | |
1178 | return -1; | |
1179 | } | |
1180 | ||
1181 | /* | |
1182 | * rpmemd_fip_process_stop -- stop processing | |
1183 | */ | |
1184 | int | |
1185 | rpmemd_fip_process_stop(struct rpmemd_fip *fip) | |
1186 | { | |
1187 | /* this stops all threads */ | |
1188 | util_fetch_and_or32(&fip->closing, 1); | |
1189 | int ret; | |
1190 | int lret = 0; | |
1191 | ||
1192 | for (size_t i = 0; i < fip->nthreads; i++) { | |
1193 | struct rpmemd_fip_thread *thread = &fip->threads[i]; | |
1194 | ret = fi_cq_signal(thread->cq); | |
1195 | if (ret) { | |
1196 | RPMEMD_FI_ERR(ret, "sending signal to CQ"); | |
1197 | lret = ret; | |
1198 | } | |
1199 | void *tret; | |
1200 | errno = os_thread_join(&thread->thread, &tret); | |
1201 | if (errno) { | |
1202 | RPMEMD_LOG(ERR, "!joining cq thread"); | |
1203 | lret = -1; | |
1204 | } else { | |
1205 | ret = (int)(uintptr_t)tret; | |
1206 | if (ret) { | |
1207 | RPMEMD_LOG(ERR, | |
1208 | "cq thread failed with code -- %d", | |
1209 | ret); | |
1210 | lret = ret; | |
1211 | } | |
1212 | } | |
1213 | } | |
1214 | ||
1215 | return lret; | |
1216 | } |