]> git.proxmox.com Git - ceph.git/blame - ceph/src/pmdk/src/tools/rpmemd/rpmemd_fip.c
import ceph 16.2.7
[ceph.git] / ceph / src / pmdk / src / tools / rpmemd / rpmemd_fip.c
CommitLineData
a4b75251
TL
1// SPDX-License-Identifier: BSD-3-Clause
2/* Copyright 2016-2020, Intel Corporation */
3
4/*
5 * rpmemd_fip.c -- rpmemd libfabric provider module source file
6 */
7
8#include <stddef.h>
9#include <stdint.h>
10#include <stdlib.h>
11#include <string.h>
12#include <limits.h>
13#include <netinet/in.h>
14#include <arpa/inet.h>
15
16#include <rdma/fabric.h>
17#include <rdma/fi_domain.h>
18#include <rdma/fi_endpoint.h>
19#include <rdma/fi_cm.h>
20#include <rdma/fi_errno.h>
21
22#include "rpmemd_log.h"
23
24#include "rpmem_common.h"
25#include "rpmem_proto.h"
26#include "rpmem_fip_msg.h"
27#include "rpmem_fip_common.h"
28#include "rpmemd_fip.h"
29
30#include "os_thread.h"
31#include "util.h"
32#include "valgrind_internal.h"
33
34#define RPMEMD_FI_ERR(e, fmt, args...)\
35 RPMEMD_LOG(ERR, fmt ": %s", ## args, fi_strerror((e)))
36
37#define RPMEMD_FI_CLOSE(f, fmt, args...) (\
38{\
39 int ret = fi_close(&(f)->fid);\
40 if (ret)\
41 RPMEMD_FI_ERR(ret, fmt, ## args);\
42 ret;\
43})
44
45/*
46 * rpmem_fip_lane -- base lane structure
47 */
48struct rpmem_fip_lane {
49 struct fid_ep *ep;
50 struct fid_cq *cq;
51};
52
53/*
54 * rpmemd_fip_lane -- daemon's lane
55 */
56struct rpmemd_fip_lane {
57 struct rpmem_fip_lane base; /* lane base structure */
58 struct rpmem_fip_msg recv; /* RECV message */
59 struct rpmem_fip_msg send; /* SEND message */
60 struct rpmem_msg_persist_resp resp; /* persist response msg buffer */
61 int send_posted; /* send buffer has been posted */
62 int recv_posted; /* recv buffer has been posted */
63};
64
65/*
66 * rpmemd_fip_thread -- thread context
67 */
68struct rpmemd_fip_thread {
69 struct rpmemd_fip *fip; /* main context */
70 os_thread_t thread; /* thread structure */
71 struct fid_cq *cq; /* per-thread completion queue */
72 struct rpmemd_fip_lane **lanes; /* lanes processed by this thread */
73 size_t nlanes; /* number of lanes processed by this thread */
74};
75
76/*
77 * rpmemd_fip -- main context of rpmemd_fip
78 */
79struct rpmemd_fip {
80 struct fi_info *fi; /* fabric interface information */
81 struct fid_fabric *fabric; /* fabric domain */
82 struct fid_domain *domain; /* fabric protection domain */
83 struct fid_eq *eq; /* event queue */
84 struct fid_pep *pep; /* passive endpoint - listener */
85 struct fid_mr *mr; /* memory region for pool */
86
87 int (*persist)(const void *addr, size_t len); /* persist function */
88 void *(*memcpy_persist)(void *pmemdest, const void *src, size_t len);
89 int (*deep_persist)(const void *addr, size_t len, void *ctx);
90 void *ctx;
91 void *addr; /* pool's address */
92 size_t size; /* size of the pool */
93 enum rpmem_persist_method persist_method;
94
95 volatile int closing; /* flag for closing background threads */
96 unsigned nlanes; /* number of lanes */
97 size_t nthreads; /* number of threads for processing */
98 size_t cq_size; /* size of completion queue */
99 size_t lanes_per_thread; /* number of lanes per thread */
100 size_t buff_size; /* size of buffer for inlined data */
101
102 struct rpmemd_fip_lane *lanes;
103 struct rpmem_fip_lane rd_lane; /* lane for read operation */
104
105 void *pmsg; /* persist message buffer */
106 size_t pmsg_size; /* persist message buffer size including alignment */
107 struct fid_mr *pmsg_mr; /* persist message memory region */
108 void *pmsg_mr_desc; /* persist message local descriptor */
109
110 struct rpmem_msg_persist_resp *pres; /* persist response buffer */
111 struct fid_mr *pres_mr; /* persist response memory region */
112 void *pres_mr_desc; /* persist response local descriptor */
113
114 struct rpmemd_fip_thread *threads;
115};
116
117/*
118 * rpmemd_fip_get_pmsg -- return persist message buffer
119 */
120static inline struct rpmem_msg_persist *
121rpmemd_fip_get_pmsg(struct rpmemd_fip *fip, size_t idx)
122{
123 return (struct rpmem_msg_persist *)
124 ((uintptr_t)fip->pmsg + idx * fip->pmsg_size);
125}
126
127/*
128 * rpmemd_fip_getinfo -- obtain fabric interface information
129 */
130static int
131rpmemd_fip_getinfo(struct rpmemd_fip *fip, const char *service,
132 const char *node, enum rpmem_provider provider)
133{
134 int ret;
135
136 struct fi_info *hints = rpmem_fip_get_hints(provider);
137 if (!hints) {
138 RPMEMD_LOG(ERR, "getting fabric interface hints");
139 ret = -1;
140 goto err_fi_get_hints;
141 }
142
143 ret = fi_getinfo(RPMEM_FIVERSION, node, service, FI_SOURCE,
144 hints, &fip->fi);
145 if (ret) {
146 RPMEMD_FI_ERR(ret, "getting fabric interface information");
147 goto err_fi_getinfo;
148 }
149
150 rpmem_fip_print_info(fip->fi);
151
152 fi_freeinfo(hints);
153 return 0;
154err_fi_getinfo:
155 fi_freeinfo(hints);
156err_fi_get_hints:
157 return ret;
158}
159
160/*
161 * rpmemd_fip_set_resp -- fill the response structure
162 */
163static int
164rpmemd_fip_set_resp(struct rpmemd_fip *fip, struct rpmem_resp_attr *resp)
165{
166 int ret;
167 if (fip->fi->addr_format == FI_SOCKADDR_IN) {
168 struct sockaddr_in addr_in;
169 size_t addrlen = sizeof(addr_in);
170
171 ret = fi_getname(&fip->pep->fid, &addr_in, &addrlen);
172 if (ret) {
173 RPMEMD_FI_ERR(ret, "getting local endpoint address");
174 goto err_fi_getname;
175 }
176
177 if (!addr_in.sin_port) {
178 RPMEMD_LOG(ERR, "dynamic allocation of port failed");
179 goto err_port;
180 }
181
182 resp->port = htons(addr_in.sin_port);
183 } else if (fip->fi->addr_format == FI_SOCKADDR_IN6) {
184 struct sockaddr_in6 addr_in6;
185 size_t addrlen = sizeof(addr_in6);
186
187 ret = fi_getname(&fip->pep->fid, &addr_in6, &addrlen);
188 if (ret) {
189 RPMEMD_FI_ERR(ret, "getting local endpoint address");
190 goto err_fi_getname;
191 }
192
193 if (!addr_in6.sin6_port) {
194 RPMEMD_LOG(ERR, "dynamic allocation of port failed");
195 goto err_port;
196 }
197
198 resp->port = htons(addr_in6.sin6_port);
199 } else {
200 RPMEMD_LOG(ERR, "invalid address format");
201 return -1;
202 }
203
204 resp->rkey = fi_mr_key(fip->mr);
205 resp->persist_method = fip->persist_method;
206 resp->raddr = (uint64_t)fip->addr;
207 resp->nlanes = fip->nlanes;
208
209 return 0;
210err_port:
211err_fi_getname:
212 return -1;
213}
214
215/*
216 * rpmemd_fip_init_fabric_res -- initialize common fabric's resources
217 */
218static int
219rpmemd_fip_init_fabric_res(struct rpmemd_fip *fip)
220{
221 int ret;
222 ret = fi_fabric(fip->fi->fabric_attr, &fip->fabric, NULL);
223 if (ret) {
224 RPMEMD_FI_ERR(ret, "opening fabric domain");
225 goto err_fi_fabric;
226 }
227
228 ret = fi_domain(fip->fabric, fip->fi, &fip->domain, NULL);
229 if (ret) {
230 RPMEMD_FI_ERR(ret, "opening fabric access domain");
231 goto err_fi_domain;
232 }
233
234 struct fi_eq_attr eq_attr = {
235 .size = 0, /* use default */
236 .flags = 0,
237 .wait_obj = FI_WAIT_UNSPEC,
238 .signaling_vector = 0,
239 .wait_set = NULL,
240 };
241
242 ret = fi_eq_open(fip->fabric, &eq_attr, &fip->eq, NULL);
243 if (ret) {
244 RPMEMD_FI_ERR(ret, "opening event queue");
245 goto err_eq_open;
246 }
247
248 ret = fi_passive_ep(fip->fabric, fip->fi, &fip->pep, NULL);
249 if (ret) {
250 RPMEMD_FI_ERR(ret, "allocating passive endpoint");
251 goto err_pep;
252 }
253
254 ret = fi_pep_bind(fip->pep, &fip->eq->fid, 0);
255 if (ret) {
256 RPMEMD_FI_ERR(ret, "binding event queue to passive endpoint");
257 goto err_pep_bind_eq;
258 }
259
260 return 0;
261err_pep_bind_eq:
262 RPMEMD_FI_CLOSE(fip->pep, "closing passive endpoint");
263err_pep:
264 RPMEMD_FI_CLOSE(fip->eq, "closing event queue");
265err_eq_open:
266 RPMEMD_FI_CLOSE(fip->domain, "closing fabric access domain");
267err_fi_domain:
268 RPMEMD_FI_CLOSE(fip->fabric, "closing fabric domain");
269err_fi_fabric:
270 return ret;
271}
272
273/*
274 * rpmemd_fip_fini_fabric_res -- deinitialize common fabric resources
275 */
276static void
277rpmemd_fip_fini_fabric_res(struct rpmemd_fip *fip)
278{
279 RPMEMD_FI_CLOSE(fip->pep, "closing passive endpoint");
280 RPMEMD_FI_CLOSE(fip->eq, "closing event queue");
281 RPMEMD_FI_CLOSE(fip->domain, "closing fabric access domain");
282 RPMEMD_FI_CLOSE(fip->fabric, "closing fabric domain");
283}
284
285/*
286 * rpmemd_fip_init_memory -- initialize memory pool's resources
287 */
288static int
289rpmemd_fip_init_memory(struct rpmemd_fip *fip)
290{
291 int ret;
292
293 /*
294 * Register memory region with appropriate access bits:
295 * - FI_REMOTE_READ - remote peer can issue READ operation,
296 * - FI_REMOTE_WRITE - remote peer can issue WRITE operation,
297 */
298 ret = fi_mr_reg(fip->domain, fip->addr, fip->size,
299 FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 0, 0,
300 &fip->mr, NULL);
301 if (ret) {
302 RPMEMD_FI_ERR(ret, "registering memory");
303 return -1;
304 }
305
306 return 0;
307}
308
309/*
310 * rpmemd_fip_fini_memory -- deinitialize memory pool's resources
311 */
312static void
313rpmemd_fip_fini_memory(struct rpmemd_fip *fip)
314{
315 RPMEMD_FI_CLOSE(fip->mr, "unregistering memory");
316}
317
318/*
319 * rpmemd_fip_init_ep -- initialize active endpoint
320 */
321static int
322rpmemd_fip_init_ep(struct rpmemd_fip *fip, struct fi_info *info,
323 struct rpmem_fip_lane *lanep)
324{
325 int ret;
326
327 info->tx_attr->size = rpmem_fip_wq_size(fip->persist_method,
328 RPMEM_FIP_NODE_SERVER);
329
330 info->rx_attr->size = rpmem_fip_rx_size(fip->persist_method,
331 RPMEM_FIP_NODE_SERVER);
332
333 /* create an endpoint from fabric interface info */
334 ret = fi_endpoint(fip->domain, info, &lanep->ep, NULL);
335 if (ret) {
336 RPMEMD_FI_ERR(ret, "allocating endpoint");
337 goto err_endpoint;
338 }
339
340 /* bind event queue to the endpoint */
341 ret = fi_ep_bind(lanep->ep, &fip->eq->fid, 0);
342 if (ret) {
343 RPMEMD_FI_ERR(ret, "binding event queue to endpoint");
344 goto err_bind_eq;
345 }
346
347 /*
348 * Bind completion queue to the endpoint.
349 * Use a single completion queue for outbound and inbound work
350 * requests. Use selective completion implies adding FI_COMPLETE
351 * flag to each WR which needs a completion.
352 */
353 ret = fi_ep_bind(lanep->ep, &lanep->cq->fid,
354 FI_RECV | FI_TRANSMIT | FI_SELECTIVE_COMPLETION);
355 if (ret) {
356 RPMEMD_FI_ERR(ret, "binding completion queue to endpoint");
357 goto err_bind_cq;
358 }
359
360 /* enable the endpoint */
361 ret = fi_enable(lanep->ep);
362 if (ret) {
363 RPMEMD_FI_ERR(ret, "enabling endpoint");
364 goto err_enable;
365 }
366
367 return 0;
368err_enable:
369err_bind_cq:
370err_bind_eq:
371 RPMEMD_FI_CLOSE(lanep->ep, "closing endpoint");
372err_endpoint:
373 return -1;
374}
375
376/*
377 * rpmemd_fip_fini_ep -- close endpoint
378 */
379static int
380rpmemd_fip_fini_ep(struct rpmem_fip_lane *lanep)
381{
382 return RPMEMD_FI_CLOSE(lanep->ep, "closing endpoint");
383}
384
385/*
386 * rpmemd_fip_post_msg -- post RECV buffer
387 */
388static inline int
389rpmemd_fip_post_msg(struct rpmemd_fip_lane *lanep)
390{
391 int ret = rpmem_fip_recvmsg(lanep->base.ep, &lanep->recv);
392 if (ret) {
393 RPMEMD_FI_ERR(ret, "posting recv buffer");
394 return ret;
395 }
396
397 lanep->recv_posted = 1;
398
399 return 0;
400}
401
402/*
403 * rpmemd_fip_post_resp -- post SEND buffer
404 */
405static inline int
406rpmemd_fip_post_resp(struct rpmemd_fip_lane *lanep)
407{
408 int ret = rpmem_fip_sendmsg(lanep->base.ep, &lanep->send,
409 sizeof(struct rpmem_msg_persist_resp));
410 if (ret) {
411 RPMEMD_FI_ERR(ret, "posting send buffer");
412 return ret;
413 }
414
415 lanep->send_posted = 1;
416
417 return 0;
418}
419
420/*
421 * rpmemd_fip_post_common -- post all RECV messages
422 */
423static int
424rpmemd_fip_post_common(struct rpmemd_fip *fip, struct rpmemd_fip_lane *lanep)
425{
426 int ret = rpmem_fip_recvmsg(lanep->base.ep, &lanep->recv);
427 if (ret) {
428 RPMEMD_FI_ERR(ret, "posting recv buffer");
429 return ret;
430 }
431
432 lanep->recv_posted = 1;
433
434 return 0;
435}
436
437/*
438 * rpmemd_fip_lanes_init -- initialize all lanes
439 */
440static int
441rpmemd_fip_lanes_init(struct rpmemd_fip *fip)
442{
443
444 fip->lanes = calloc(fip->nlanes, sizeof(*fip->lanes));
445 if (!fip->lanes) {
446 RPMEMD_ERR("!allocating lanes");
447 goto err_alloc;
448 }
449
450 return 0;
451err_alloc:
452 return -1;
453}
454
455/*
456 * rpmemd_fip_fini_lanes -- deinitialize all lanes
457 */
458static void
459rpmemd_fip_fini_lanes(struct rpmemd_fip *fip)
460{
461 free(fip->lanes);
462}
463
464/*
465 * rpmemd_fip_init_common -- initialize common resources
466 */
467static int
468rpmemd_fip_init_common(struct rpmemd_fip *fip)
469{
470 int ret;
471
472 /* allocate persist message buffer */
473 size_t msg_size = fip->nlanes * fip->pmsg_size;
474 fip->pmsg = malloc(msg_size);
475 if (!fip->pmsg) {
476 RPMEMD_LOG(ERR, "!allocating messages buffer");
477 goto err_msg_malloc;
478 }
479
480 /* register persist message buffer */
481 ret = fi_mr_reg(fip->domain, fip->pmsg, msg_size, FI_RECV,
482 0, 0, 0, &fip->pmsg_mr, NULL);
483 if (ret) {
484 RPMEMD_FI_ERR(ret, "registering messages buffer");
485 goto err_mr_reg_msg;
486 }
487
488 /* get persist message buffer's local descriptor */
489 fip->pmsg_mr_desc = fi_mr_desc(fip->pmsg_mr);
490
491 /* allocate persist response message buffer */
492 size_t msg_resp_size = fip->nlanes *
493 sizeof(struct rpmem_msg_persist_resp);
494 fip->pres = malloc(msg_resp_size);
495 if (!fip->pres) {
496 RPMEMD_FI_ERR(ret, "allocating messages response buffer");
497 goto err_msg_resp_malloc;
498 }
499
500 /* register persist response message buffer */
501 ret = fi_mr_reg(fip->domain, fip->pres, msg_resp_size, FI_SEND,
502 0, 0, 0, &fip->pres_mr, NULL);
503 if (ret) {
504 RPMEMD_FI_ERR(ret, "registering messages "
505 "response buffer");
506 goto err_mr_reg_msg_resp;
507 }
508
509 /* get persist message buffer's local descriptor */
510 fip->pres_mr_desc = fi_mr_desc(fip->pres_mr);
511
512 /* initialize lanes */
513 unsigned i;
514 for (i = 0; i < fip->nlanes; i++) {
515 struct rpmemd_fip_lane *lanep = &fip->lanes[i];
516
517 /* initialize RECV message */
518 rpmem_fip_msg_init(&lanep->recv,
519 fip->pmsg_mr_desc, 0,
520 lanep,
521 rpmemd_fip_get_pmsg(fip, i),
522 fip->pmsg_size,
523 FI_COMPLETION);
524
525 /* initialize SEND message */
526 rpmem_fip_msg_init(&lanep->send,
527 fip->pres_mr_desc, 0,
528 lanep,
529 &fip->pres[i],
530 sizeof(fip->pres[i]),
531 FI_COMPLETION);
532 }
533
534 return 0;
535err_mr_reg_msg_resp:
536 free(fip->pres);
537err_msg_resp_malloc:
538 RPMEMD_FI_CLOSE(fip->pmsg_mr,
539 "unregistering messages buffer");
540err_mr_reg_msg:
541 free(fip->pmsg);
542err_msg_malloc:
543 return -1;
544}
545
546/*
547 * rpmemd_fip_fini_common -- deinitialize common resources and return last
548 * error code
549 */
550static int
551rpmemd_fip_fini_common(struct rpmemd_fip *fip)
552{
553 int lret = 0;
554 int ret;
555
556 ret = RPMEMD_FI_CLOSE(fip->pmsg_mr,
557 "unregistering messages buffer");
558 if (ret)
559 lret = ret;
560
561 ret = RPMEMD_FI_CLOSE(fip->pres_mr,
562 "unregistering messages response buffer");
563 if (ret)
564 lret = ret;
565
566 free(fip->pmsg);
567 free(fip->pres);
568
569 return lret;
570}
571
572/*
573 * rpmemd_fip_check_pmsg -- verify persist message
574 */
575static inline int
576rpmemd_fip_check_pmsg(struct rpmemd_fip *fip, struct rpmem_msg_persist *pmsg)
577{
578 if (pmsg->lane >= fip->nlanes) {
579 RPMEMD_LOG(ERR, "invalid lane number -- %u", pmsg->lane);
580 return -1;
581 }
582
583 uintptr_t raddr = pmsg->addr;
584 uintptr_t laddr = (uintptr_t)fip->addr;
585
586 if (raddr < laddr || raddr + pmsg->size > laddr + fip->size) {
587 RPMEMD_LOG(ERR, "invalid address or size requested "
588 "for persist operation (0x%lx, %lu)",
589 raddr, pmsg->size);
590 return -1;
591 }
592
593 return 0;
594}
595
596/*
597 * rpmemd_fip_process_send -- process FI_SEND completion
598 */
599static int
600rpmemd_fip_process_send(struct rpmemd_fip *fip, struct rpmemd_fip_lane *lanep)
601{
602 lanep->send_posted = 0;
603
604 if (lanep->recv_posted)
605 return 0;
606
607 struct rpmem_msg_persist_resp *pres =
608 rpmem_fip_msg_get_pres(&lanep->send);
609
610 *pres = lanep->resp;
611
612 int ret;
613
614 /* post lane's RECV buffer */
615 ret = rpmemd_fip_post_msg(lanep);
616 if (unlikely(ret))
617 goto err;
618
619 /* post lane's SEND buffer */
620 ret = rpmemd_fip_post_resp(lanep);
621err:
622 return ret;
623}
624
625/*
626 * rpmemd_fip_process_recv -- process FI_RECV completion
627 */
628static int
629rpmemd_fip_process_recv(struct rpmemd_fip *fip, struct rpmemd_fip_lane *lanep)
630{
631 int ret = 0;
632
633 lanep->recv_posted = 0;
634
635 /*
636 * Get persist message and persist message response from appropriate
637 * buffers. The persist message is in lane's RECV buffer and the
638 * persist response message in lane's SEND buffer.
639 */
640 struct rpmem_msg_persist *pmsg = rpmem_fip_msg_get_pmsg(&lanep->recv);
641 VALGRIND_DO_MAKE_MEM_DEFINED(pmsg, sizeof(*pmsg));
642
643 /* verify persist message */
644 ret = rpmemd_fip_check_pmsg(fip, pmsg);
645 if (unlikely(ret))
646 goto err;
647 unsigned mode = pmsg->flags & RPMEM_FLUSH_PERSIST_MASK;
648
649 if (mode == RPMEM_DEEP_PERSIST) {
650 fip->deep_persist((void *)pmsg->addr, pmsg->size, fip->ctx);
651 } else if (mode == RPMEM_PERSIST_SEND) {
652 fip->memcpy_persist((void *)pmsg->addr, pmsg->data, pmsg->size);
653 } else {
654 fip->persist((void *)pmsg->addr, pmsg->size);
655 }
656
657 struct rpmem_msg_persist_resp *pres = lanep->send_posted ?
658 &lanep->resp : rpmem_fip_msg_get_pres(&lanep->send);
659
660 /* return back the lane id */
661 pres->lane = pmsg->lane;
662
663 if (!lanep->send_posted) {
664 /* post lane's RECV buffer */
665 ret = rpmemd_fip_post_msg(lanep);
666 if (unlikely(ret))
667 goto err;
668
669 /* post lane's SEND buffer */
670 ret = rpmemd_fip_post_resp(lanep);
671 }
672
673err:
674 return ret;
675}
676
677/*
678 * rpmemd_fip_cq_read -- wait for specific events on completion queue
679 */
680static int
681rpmemd_fip_cq_read(struct rpmemd_fip *fip, struct fid_cq *cq,
682 struct rpmemd_fip_lane **lanep, uint64_t *event, uint64_t event_mask)
683{
684 struct fi_cq_err_entry err;
685 struct fi_cq_msg_entry cq_entry;
686 const char *str_err;
687 ssize_t sret;
688 int ret;
689
690 while (!fip->closing) {
691 sret = fi_cq_sread(cq, &cq_entry, 1, NULL,
692 RPMEM_FIP_CQ_WAIT_MS);
693
694 if (unlikely(fip->closing))
695 break;
696
697 if (unlikely(sret == -FI_EAGAIN || sret == 0))
698 continue;
699
700 if (unlikely(sret < 0)) {
701 ret = (int)sret;
702 goto err_cq_read;
703 }
704
705 if (!(cq_entry.flags & event_mask)) {
706 RPMEMD_LOG(ERR, "unexpected event received %lx",
707 cq_entry.flags);
708 ret = -1;
709 goto err;
710 }
711
712 if (!cq_entry.op_context) {
713 RPMEMD_LOG(ERR, "null context received");
714 ret = -1;
715 goto err;
716 }
717
718 *event = cq_entry.flags & event_mask;
719 *lanep = cq_entry.op_context;
720
721 return 0;
722 }
723
724 return 0;
725err_cq_read:
726 sret = fi_cq_readerr(cq, &err, 0);
727 if (sret < 0) {
728 RPMEMD_FI_ERR((int)sret, "error reading from completion queue: "
729 "cannot read error from completion queue");
730 goto err;
731 }
732
733 str_err = fi_cq_strerror(cq, err.prov_errno, NULL, NULL, 0);
734 RPMEMD_LOG(ERR, "error reading from completion queue: %s", str_err);
735err:
736 return ret;
737}
738
739/*
740 * rpmemd_fip_thread -- thread callback which processes persist
741 * operation
742 */
743static void *
744rpmemd_fip_thread(void *arg)
745{
746 struct rpmemd_fip_thread *thread = arg;
747 struct rpmemd_fip *fip = thread->fip;
748 struct rpmemd_fip_lane *lanep = NULL;
749 uint64_t event = 0;
750 int ret = 0;
751
752 while (!fip->closing) {
753 ret = rpmemd_fip_cq_read(fip, thread->cq, &lanep, &event,
754 FI_SEND|FI_RECV);
755 if (ret)
756 goto err;
757
758 if (unlikely(fip->closing))
759 break;
760
761 RPMEMD_ASSERT(lanep != NULL);
762 if (event & FI_RECV)
763 ret = rpmemd_fip_process_recv(fip, lanep);
764 else if (event & FI_SEND)
765 ret = rpmemd_fip_process_send(fip, lanep);
766 if (ret)
767 goto err;
768 }
769
770 return 0;
771err:
772 return (void *)(uintptr_t)ret;
773}
774
775/*
776 * rpmemd_fip_get_def_nthreads -- get default number of threads for given
777 * persistency method
778 */
779static size_t
780rpmemd_fip_get_def_nthreads(struct rpmemd_fip *fip)
781{
782 RPMEMD_ASSERT(fip->nlanes > 0);
783 switch (fip->persist_method) {
784 case RPMEM_PM_APM:
785 case RPMEM_PM_GPSPM:
786 return fip->nlanes;
787 default:
788 RPMEMD_ASSERT(0);
789 return 0;
790 }
791}
792
793/*
794 * rpmemd_fip_set_attr -- save required attributes in rpmemd_fip handle
795 */
796static void
797rpmemd_fip_set_attr(struct rpmemd_fip *fip, struct rpmemd_fip_attr *attr)
798{
799 fip->addr = attr->addr;
800 fip->size = attr->size;
801 fip->persist_method = attr->persist_method;
802 fip->persist = attr->persist;
803 fip->memcpy_persist = attr->memcpy_persist;
804 fip->deep_persist = attr->deep_persist;
805 fip->ctx = attr->ctx;
806 fip->buff_size = attr->buff_size;
807 fip->pmsg_size = roundup(sizeof(struct rpmem_msg_persist) +
808 fip->buff_size, (size_t)64);
809
810 size_t max_nlanes = rpmem_fip_max_nlanes(fip->fi);
811 RPMEMD_ASSERT(max_nlanes < UINT_MAX);
812 fip->nlanes = min((unsigned)max_nlanes, attr->nlanes);
813
814 if (attr->nthreads) {
815 fip->nthreads = attr->nthreads;
816 } else {
817 /* use default */
818 fip->nthreads = rpmemd_fip_get_def_nthreads(fip);
819 }
820
821 fip->lanes_per_thread = (fip->nlanes - 1) / fip->nthreads + 1;
822 size_t cq_size_per_lane = rpmem_fip_cq_size(fip->persist_method,
823 RPMEM_FIP_NODE_SERVER);
824
825 fip->cq_size = fip->lanes_per_thread * cq_size_per_lane;
826
827 RPMEMD_ASSERT(fip->persist_method < MAX_RPMEM_PM);
828}
829
830/*
831 * rpmemd_fip_init_thread -- init worker thread
832 */
833static int
834rpmemd_fip_init_thread(struct rpmemd_fip *fip, struct rpmemd_fip_thread *thread)
835{
836 thread->fip = fip;
837 thread->lanes = malloc(fip->lanes_per_thread * sizeof(*thread->lanes));
838 if (!thread->lanes) {
839 RPMEMD_LOG(ERR, "!allocating thread lanes");
840 goto err_alloc_lanes;
841 }
842
843 struct fi_cq_attr cq_attr = {
844 .size = fip->cq_size,
845 .flags = 0,
846 .format = FI_CQ_FORMAT_MSG, /* need context and flags */
847 .wait_obj = FI_WAIT_UNSPEC,
848 .signaling_vector = 0,
849 .wait_cond = FI_CQ_COND_NONE,
850 .wait_set = NULL,
851 };
852
853 int ret = fi_cq_open(fip->domain, &cq_attr, &thread->cq, NULL);
854 if (ret) {
855 RPMEMD_FI_ERR(ret, "opening completion queue");
856 goto err_cq_open;
857 }
858
859 return 0;
860err_cq_open:
861 free(thread->lanes);
862err_alloc_lanes:
863 return -1;
864}
865
866/*
867 * rpmemd_fip_fini_thread -- deinitialize worker thread
868 */
869static void
870rpmemd_fip_fini_thread(struct rpmemd_fip *fip, struct rpmemd_fip_thread *thread)
871{
872 RPMEMD_FI_CLOSE(thread->cq, "closing completion queue");
873 free(thread->lanes);
874}
875
876/*
877 * rpmemd_fip_init_threads -- initialize worker threads
878 */
879static int
880rpmemd_fip_init_threads(struct rpmemd_fip *fip)
881{
882 RPMEMD_ASSERT(fip->lanes != NULL);
883 RPMEMD_ASSERT(fip->nthreads > 0);
884
885 fip->threads = calloc(fip->nthreads, sizeof(*fip->threads));
886 if (!fip->threads) {
887 RPMEMD_LOG(ERR, "!allocating threads");
888 goto err_alloc_threads;
889 }
890
891 int ret;
892 size_t i;
893 for (i = 0; i < fip->nthreads; i++) {
894 ret = rpmemd_fip_init_thread(fip, &fip->threads[i]);
895 if (ret) {
896 RPMEMD_LOG(ERR, "!initializing thread %zu", i);
897 goto err_init_thread;
898 }
899 }
900
901 for (size_t i = 0; i < fip->nlanes; i++) {
902 size_t w = i % fip->nthreads;
903 struct rpmemd_fip_thread *thread = &fip->threads[w];
904 fip->lanes[i].base.cq = thread->cq;
905 thread->lanes[thread->nlanes++] = &fip->lanes[i];
906 }
907
908 return 0;
909err_init_thread:
910 for (size_t j = 0; j < i; j++)
911 rpmemd_fip_fini_thread(fip, &fip->threads[j]);
912 free(fip->threads);
913err_alloc_threads:
914 return -1;
915}
916
917/*
918 * rpmemd_fip_fini_threads -- deinitialize worker threads
919 */
920static void
921rpmemd_fip_fini_threads(struct rpmemd_fip *fip)
922{
923 for (size_t i = 0; i < fip->nthreads; i++)
924 rpmemd_fip_fini_thread(fip, &fip->threads[i]);
925 free(fip->threads);
926}
927
928/*
929 * rpmemd_fip_init -- initialize fabric provider
930 */
931struct rpmemd_fip *
932rpmemd_fip_init(const char *node, const char *service,
933 struct rpmemd_fip_attr *attr, struct rpmem_resp_attr *resp,
934 enum rpmem_err *err)
935{
936 int ret;
937
938 RPMEMD_ASSERT(resp);
939 RPMEMD_ASSERT(err);
940 RPMEMD_ASSERT(attr);
941 RPMEMD_ASSERT(attr->persist);
942
943 struct rpmemd_fip *fip = calloc(1, sizeof(*fip));
944 if (!fip) {
945 RPMEMD_LOG(ERR, "!allocating fabric handle");
946 *err = RPMEM_ERR_FATAL;
947 return NULL;
948 }
949
950 ret = rpmemd_fip_getinfo(fip, service, node, attr->provider);
951 if (ret) {
952 *err = RPMEM_ERR_BADPROVIDER;
953 goto err_getinfo;
954 }
955
956 rpmemd_fip_set_attr(fip, attr);
957
958 ret = rpmemd_fip_init_fabric_res(fip);
959 if (ret) {
960 *err = RPMEM_ERR_FATAL;
961 goto err_init_fabric_res;
962 }
963
964 ret = rpmemd_fip_init_memory(fip);
965 if (ret) {
966 *err = RPMEM_ERR_FATAL;
967 goto err_init_memory;
968 }
969
970 ret = rpmemd_fip_lanes_init(fip);
971 if (ret) {
972 *err = RPMEM_ERR_FATAL;
973 goto err_init_lanes;
974 }
975
976 ret = rpmemd_fip_init_threads(fip);
977 if (ret) {
978 *err = RPMEM_ERR_FATAL;
979 goto err_init_threads;
980 }
981
982 ret = rpmemd_fip_init_common(fip);
983 if (ret) {
984 *err = RPMEM_ERR_FATAL;
985 goto err_init;
986 }
987
988 ret = fi_listen(fip->pep);
989 if (ret) {
990 *err = RPMEM_ERR_FATAL_CONN;
991 goto err_fi_listen;
992 }
993
994 ret = rpmemd_fip_set_resp(fip, resp);
995 if (ret) {
996 *err = RPMEM_ERR_FATAL;
997 goto err_set_resp;
998 }
999
1000 return fip;
1001err_set_resp:
1002 RPMEMD_FI_CLOSE(fip->pep, "closing passive endpoint");
1003err_fi_listen:
1004 rpmemd_fip_fini_common(fip);
1005err_init:
1006 rpmemd_fip_fini_threads(fip);
1007err_init_threads:
1008 rpmemd_fip_fini_lanes(fip);
1009err_init_lanes:
1010 rpmemd_fip_fini_memory(fip);
1011err_init_memory:
1012 rpmemd_fip_fini_fabric_res(fip);
1013err_init_fabric_res:
1014 fi_freeinfo(fip->fi);
1015err_getinfo:
1016 free(fip);
1017 return NULL;
1018}
1019
1020/*
1021 * rpmemd_fip_fini -- deinitialize fabric provider
1022 */
1023void
1024rpmemd_fip_fini(struct rpmemd_fip *fip)
1025{
1026 rpmemd_fip_fini_common(fip);
1027 rpmemd_fip_fini_threads(fip);
1028 rpmemd_fip_fini_lanes(fip);
1029 rpmemd_fip_fini_memory(fip);
1030 rpmemd_fip_fini_fabric_res(fip);
1031 fi_freeinfo(fip->fi);
1032 free(fip);
1033}
1034
1035/*
1036 * rpmemd_fip_accept_one -- accept a single connection
1037 */
1038static int
1039rpmemd_fip_accept_one(struct rpmemd_fip *fip,
1040 struct fi_info *info, struct rpmemd_fip_lane *lanep)
1041{
1042 int ret;
1043
1044 ret = rpmemd_fip_init_ep(fip, info, &lanep->base);
1045 if (ret)
1046 goto err_init_ep;
1047
1048 ret = rpmemd_fip_post_common(fip, lanep);
1049 if (ret)
1050 goto err_post;
1051
1052 ret = fi_accept(lanep->base.ep, NULL, 0);
1053 if (ret) {
1054 RPMEMD_FI_ERR(ret, "accepting connection request");
1055 goto err_accept;
1056 }
1057
1058 fi_freeinfo(info);
1059
1060 return 0;
1061err_accept:
1062err_post:
1063 rpmemd_fip_fini_ep(&lanep->base);
1064err_init_ep:
1065 fi_freeinfo(info);
1066 return -1;
1067}
1068
1069/*
1070 * rpmemd_fip_accept -- accept a single connection request
1071 */
1072int
1073rpmemd_fip_accept(struct rpmemd_fip *fip, int timeout)
1074{
1075 int ret;
1076 struct fi_eq_cm_entry entry;
1077 uint32_t event;
1078 unsigned nreq = 0; /* number of connection requests */
1079 unsigned ncon = 0; /* number of connected endpoints */
1080 int connecting = 1;
1081
1082 while (connecting && (nreq < fip->nlanes || ncon < fip->nlanes)) {
1083 ret = rpmem_fip_read_eq(fip->eq, &entry,
1084 &event, timeout);
1085 if (ret)
1086 goto err_read_eq;
1087
1088 switch (event) {
1089 case FI_CONNREQ:
1090 ret = rpmemd_fip_accept_one(fip, entry.info,
1091 &fip->lanes[nreq]);
1092 if (ret)
1093 goto err_accept_one;
1094 nreq++;
1095 break;
1096 case FI_CONNECTED:
1097 ncon++;
1098 break;
1099 case FI_SHUTDOWN:
1100 connecting = 0;
1101 break;
1102 default:
1103 RPMEMD_ERR("unexpected event received (%u)", event);
1104 goto err_read_eq;
1105
1106 }
1107 }
1108
1109 return 0;
1110err_accept_one:
1111err_read_eq:
1112 return -1;
1113}
1114
1115/*
1116 * rpmemd_fip_wait_close -- wait specified time for connection closed event
1117 */
1118int
1119rpmemd_fip_wait_close(struct rpmemd_fip *fip, int timeout)
1120{
1121 struct fi_eq_cm_entry entry;
1122 int lret = 0;
1123 uint32_t event;
1124 int ret;
1125
1126 for (unsigned i = 0; i < fip->nlanes; i++) {
1127 ret = rpmem_fip_read_eq(fip->eq, &entry, &event, timeout);
1128 if (ret)
1129 lret = ret;
1130 if (event != FI_SHUTDOWN) {
1131 RPMEMD_ERR("unexpected event received "
1132 "(is %u expected %u)",
1133 event, FI_SHUTDOWN);
1134 errno = EINVAL;
1135 lret = -1;
1136 }
1137 }
1138
1139 return lret;
1140}
1141
1142/*
1143 * rpmemd_fip_close -- close the connection
1144 */
1145int
1146rpmemd_fip_close(struct rpmemd_fip *fip)
1147{
1148 int lret = 0;
1149 int ret;
1150
1151 for (unsigned i = 0; i < fip->nlanes; i++) {
1152 ret = rpmemd_fip_fini_ep(&fip->lanes[i].base);
1153 if (ret)
1154 lret = ret;
1155 }
1156
1157 return lret;
1158}
1159
1160/*
1161 * rpmemd_fip_process_start -- start processing
1162 */
1163int
1164rpmemd_fip_process_start(struct rpmemd_fip *fip)
1165{
1166 unsigned i;
1167 for (i = 0; i < fip->nthreads; i++) {
1168 errno = os_thread_create(&fip->threads[i].thread, NULL,
1169 rpmemd_fip_thread, &fip->threads[i]);
1170 if (errno) {
1171 RPMEMD_ERR("!running thread thread");
1172 goto err_thread_create;
1173 }
1174 }
1175
1176 return 0;
1177err_thread_create:
1178 return -1;
1179}
1180
1181/*
1182 * rpmemd_fip_process_stop -- stop processing
1183 */
1184int
1185rpmemd_fip_process_stop(struct rpmemd_fip *fip)
1186{
1187 /* this stops all threads */
1188 util_fetch_and_or32(&fip->closing, 1);
1189 int ret;
1190 int lret = 0;
1191
1192 for (size_t i = 0; i < fip->nthreads; i++) {
1193 struct rpmemd_fip_thread *thread = &fip->threads[i];
1194 ret = fi_cq_signal(thread->cq);
1195 if (ret) {
1196 RPMEMD_FI_ERR(ret, "sending signal to CQ");
1197 lret = ret;
1198 }
1199 void *tret;
1200 errno = os_thread_join(&thread->thread, &tret);
1201 if (errno) {
1202 RPMEMD_LOG(ERR, "!joining cq thread");
1203 lret = -1;
1204 } else {
1205 ret = (int)(uintptr_t)tret;
1206 if (ret) {
1207 RPMEMD_LOG(ERR,
1208 "cq thread failed with code -- %d",
1209 ret);
1210 lret = ret;
1211 }
1212 }
1213 }
1214
1215 return lret;
1216}