1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Intel Corporation
10 #include <rte_string_fns.h>
12 #include "eal_memalloc.h"
14 #include "malloc_elem.h"
15 #include "malloc_mp.h"
17 #define MP_ACTION_SYNC "mp_malloc_sync"
18 /**< request sent by primary process to notify of changes in memory map */
19 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
20 /**< request sent by primary process to notify of changes in memory map. this is
21 * essentially a regular sync request, but we cannot send sync requests while
22 * another one is in progress, and we might have to - therefore, we do this as
23 * a separate callback.
25 #define MP_ACTION_REQUEST "mp_malloc_request"
26 /**< request sent by secondary process to ask for allocation/deallocation */
27 #define MP_ACTION_RESPONSE "mp_malloc_response"
28 /**< response sent to secondary process to indicate result of request */
30 /* forward declarations */
32 handle_sync_response(const struct rte_mp_msg
*request
,
33 const struct rte_mp_reply
*reply
);
35 handle_rollback_response(const struct rte_mp_msg
*request
,
36 const struct rte_mp_reply
*reply
);
38 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
40 /* when we're allocating, we need to store some state to ensure that we can
43 struct primary_alloc_req_state
{
44 struct malloc_heap
*heap
;
45 struct rte_memseg
**ms
;
47 struct malloc_elem
*elem
;
53 REQ_STATE_INACTIVE
= 0,
59 TAILQ_ENTRY(mp_request
) next
;
60 struct malloc_mp_req user_req
; /**< contents of request */
61 pthread_cond_t cond
; /**< variable we use to time out on this request */
62 enum req_state state
; /**< indicate status of this request */
63 struct primary_alloc_req_state alloc_state
;
67 * We could've used just a single request, but it may be possible for
68 * secondaries to timeout earlier than the primary, and send a new request while
69 * primary is still expecting replies to the old one. Therefore, each new
70 * request will get assigned a new ID, which is how we will distinguish between
71 * expected and unexpected messages.
73 TAILQ_HEAD(mp_request_list
, mp_request
);
75 struct mp_request_list list
;
78 .list
= TAILQ_HEAD_INITIALIZER(mp_request_list
.list
),
79 .lock
= PTHREAD_MUTEX_INITIALIZER
83 * General workflow is the following:
86 * S: send request to primary
87 * P: attempt to allocate memory
88 * if failed, sendmsg failure
89 * if success, send sync request
90 * S: if received msg of failure, quit
91 * if received sync request, synchronize memory map and reply with result
92 * P: if received sync request result
93 * if success, sendmsg success
94 * if failure, roll back allocation and send a rollback request
95 * S: if received msg of success, quit
96 * if received rollback request, synchronize memory map and reply with result
97 * P: if received sync request result
98 * sendmsg sync request result
99 * S: if received msg, quit
101 * Aside from timeouts, there are three points where we can quit:
102 * - if allocation failed straight away
103 * - if allocation and sync request succeeded
104 * - if allocation succeeded, sync request failed, allocation rolled back and
105 * rollback request received (irrespective of whether it succeeded or failed)
108 * S: send request to primary
109 * P: attempt to deallocate memory
110 * if failed, sendmsg failure
111 * if success, send sync request
112 * S: if received msg of failure, quit
113 * if received sync request, synchronize memory map and reply with result
114 * P: if received sync request result
115 * sendmsg sync request result
116 * S: if received msg, quit
118 * There is no "rollback" from deallocation, as it's safe to have some memory
119 * mapped in some processes - it's absent from the heap, so it won't get used.
122 static struct mp_request
*
123 find_request_by_id(uint64_t id
)
125 struct mp_request
*req
;
126 TAILQ_FOREACH(req
, &mp_request_list
.list
, next
) {
127 if (req
->user_req
.id
== id
)
133 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
140 } while (find_request_by_id(id
) != NULL
);
144 /* secondary will respond to sync requests thusly */
146 handle_sync(const struct rte_mp_msg
*msg
, const void *peer
)
148 struct rte_mp_msg reply
;
149 const struct malloc_mp_req
*req
=
150 (const struct malloc_mp_req
*)msg
->param
;
151 struct malloc_mp_req
*resp
=
152 (struct malloc_mp_req
*)reply
.param
;
155 if (req
->t
!= REQ_TYPE_SYNC
) {
156 RTE_LOG(ERR
, EAL
, "Unexpected request from primary\n");
160 memset(&reply
, 0, sizeof(reply
));
163 strlcpy(reply
.name
, msg
->name
, sizeof(reply
.name
));
164 reply
.len_param
= sizeof(*resp
);
166 ret
= eal_memalloc_sync_with_primary();
168 resp
->t
= REQ_TYPE_SYNC
;
170 resp
->result
= ret
== 0 ? REQ_RESULT_SUCCESS
: REQ_RESULT_FAIL
;
172 rte_mp_reply(&reply
, peer
);
178 handle_alloc_request(const struct malloc_mp_req
*m
,
179 struct mp_request
*req
)
181 const struct malloc_req_alloc
*ar
= &m
->alloc_req
;
182 struct malloc_heap
*heap
;
183 struct malloc_elem
*elem
;
184 struct rte_memseg
**ms
;
189 alloc_sz
= RTE_ALIGN_CEIL(ar
->align
+ ar
->elt_size
+
190 MALLOC_ELEM_TRAILER_LEN
, ar
->page_sz
);
191 n_segs
= alloc_sz
/ ar
->page_sz
;
195 /* we can't know in advance how many pages we'll need, so we malloc */
196 ms
= malloc(sizeof(*ms
) * n_segs
);
198 memset(ms
, 0, sizeof(*ms
) * n_segs
);
201 RTE_LOG(ERR
, EAL
, "Couldn't allocate memory for request state\n");
205 elem
= alloc_pages_on_heap(heap
, ar
->page_sz
, ar
->elt_size
, ar
->socket
,
206 ar
->flags
, ar
->align
, ar
->bound
, ar
->contig
, ms
,
212 map_addr
= ms
[0]->addr
;
214 /* we have succeeded in allocating memory, but we still need to sync
215 * with other processes. however, since DPDK IPC is single-threaded, we
216 * send an asynchronous request and exit this callback.
219 req
->alloc_state
.ms
= ms
;
220 req
->alloc_state
.ms_len
= n_segs
;
221 req
->alloc_state
.map_addr
= map_addr
;
222 req
->alloc_state
.map_len
= alloc_sz
;
223 req
->alloc_state
.elem
= elem
;
224 req
->alloc_state
.heap
= heap
;
232 /* first stage of primary handling requests from secondary */
234 handle_request(const struct rte_mp_msg
*msg
, const void *peer __rte_unused
)
236 const struct malloc_mp_req
*m
=
237 (const struct malloc_mp_req
*)msg
->param
;
238 struct mp_request
*entry
;
241 /* lock access to request */
242 pthread_mutex_lock(&mp_request_list
.lock
);
244 /* make sure it's not a dupe */
245 entry
= find_request_by_id(m
->id
);
247 RTE_LOG(ERR
, EAL
, "Duplicate request id\n");
251 entry
= malloc(sizeof(*entry
));
253 RTE_LOG(ERR
, EAL
, "Unable to allocate memory for request\n");
258 memset(entry
, 0, sizeof(*entry
));
260 if (m
->t
== REQ_TYPE_ALLOC
) {
261 ret
= handle_alloc_request(m
, entry
);
262 } else if (m
->t
== REQ_TYPE_FREE
) {
263 ret
= malloc_heap_free_pages(m
->free_req
.addr
,
266 RTE_LOG(ERR
, EAL
, "Unexpected request from secondary\n");
271 struct rte_mp_msg resp_msg
;
272 struct malloc_mp_req
*resp
=
273 (struct malloc_mp_req
*)resp_msg
.param
;
275 /* send failure message straight away */
276 resp_msg
.num_fds
= 0;
277 resp_msg
.len_param
= sizeof(*resp
);
278 strlcpy(resp_msg
.name
, MP_ACTION_RESPONSE
,
279 sizeof(resp_msg
.name
));
282 resp
->result
= REQ_RESULT_FAIL
;
285 if (rte_mp_sendmsg(&resp_msg
)) {
286 RTE_LOG(ERR
, EAL
, "Couldn't send response\n");
289 /* we did not modify the request */
292 struct rte_mp_msg sr_msg
;
293 struct malloc_mp_req
*sr
=
294 (struct malloc_mp_req
*)sr_msg
.param
;
297 memset(&sr_msg
, 0, sizeof(sr_msg
));
299 /* we can do something, so send sync request asynchronously */
301 sr_msg
.len_param
= sizeof(*sr
);
302 strlcpy(sr_msg
.name
, MP_ACTION_SYNC
, sizeof(sr_msg
.name
));
305 ts
.tv_sec
= MP_TIMEOUT_S
;
307 /* sync requests carry no data */
308 sr
->t
= REQ_TYPE_SYNC
;
311 /* there may be stray timeout still waiting */
313 ret
= rte_mp_request_async(&sr_msg
, &ts
,
314 handle_sync_response
);
315 } while (ret
!= 0 && rte_errno
== EEXIST
);
317 RTE_LOG(ERR
, EAL
, "Couldn't send sync request\n");
318 if (m
->t
== REQ_TYPE_ALLOC
)
319 free(entry
->alloc_state
.ms
);
323 /* mark request as in progress */
324 memcpy(&entry
->user_req
, m
, sizeof(*m
));
325 entry
->state
= REQ_STATE_ACTIVE
;
327 TAILQ_INSERT_TAIL(&mp_request_list
.list
, entry
, next
);
329 pthread_mutex_unlock(&mp_request_list
.lock
);
332 pthread_mutex_unlock(&mp_request_list
.lock
);
337 /* callback for asynchronous sync requests for primary. this will either do a
338 * sendmsg with results, or trigger rollback request.
341 handle_sync_response(const struct rte_mp_msg
*request
,
342 const struct rte_mp_reply
*reply
)
344 enum malloc_req_result result
;
345 struct mp_request
*entry
;
346 const struct malloc_mp_req
*mpreq
=
347 (const struct malloc_mp_req
*)request
->param
;
350 /* lock the request */
351 pthread_mutex_lock(&mp_request_list
.lock
);
353 entry
= find_request_by_id(mpreq
->id
);
355 RTE_LOG(ERR
, EAL
, "Wrong request ID\n");
359 result
= REQ_RESULT_SUCCESS
;
361 if (reply
->nb_received
!= reply
->nb_sent
)
362 result
= REQ_RESULT_FAIL
;
364 for (i
= 0; i
< reply
->nb_received
; i
++) {
365 struct malloc_mp_req
*resp
=
366 (struct malloc_mp_req
*)reply
->msgs
[i
].param
;
368 if (resp
->t
!= REQ_TYPE_SYNC
) {
369 RTE_LOG(ERR
, EAL
, "Unexpected response to sync request\n");
370 result
= REQ_RESULT_FAIL
;
373 if (resp
->id
!= entry
->user_req
.id
) {
374 RTE_LOG(ERR
, EAL
, "Response to wrong sync request\n");
375 result
= REQ_RESULT_FAIL
;
378 if (resp
->result
== REQ_RESULT_FAIL
) {
379 result
= REQ_RESULT_FAIL
;
384 if (entry
->user_req
.t
== REQ_TYPE_FREE
) {
385 struct rte_mp_msg msg
;
386 struct malloc_mp_req
*resp
= (struct malloc_mp_req
*)msg
.param
;
388 memset(&msg
, 0, sizeof(msg
));
390 /* this is a free request, just sendmsg result */
391 resp
->t
= REQ_TYPE_FREE
;
392 resp
->result
= result
;
393 resp
->id
= entry
->user_req
.id
;
395 msg
.len_param
= sizeof(*resp
);
396 strlcpy(msg
.name
, MP_ACTION_RESPONSE
, sizeof(msg
.name
));
398 if (rte_mp_sendmsg(&msg
))
399 RTE_LOG(ERR
, EAL
, "Could not send message to secondary process\n");
401 TAILQ_REMOVE(&mp_request_list
.list
, entry
, next
);
403 } else if (entry
->user_req
.t
== REQ_TYPE_ALLOC
&&
404 result
== REQ_RESULT_SUCCESS
) {
405 struct malloc_heap
*heap
= entry
->alloc_state
.heap
;
406 struct rte_mp_msg msg
;
407 struct malloc_mp_req
*resp
=
408 (struct malloc_mp_req
*)msg
.param
;
410 memset(&msg
, 0, sizeof(msg
));
412 heap
->total_size
+= entry
->alloc_state
.map_len
;
414 /* result is success, so just notify secondary about this */
415 resp
->t
= REQ_TYPE_ALLOC
;
416 resp
->result
= result
;
417 resp
->id
= entry
->user_req
.id
;
419 msg
.len_param
= sizeof(*resp
);
420 strlcpy(msg
.name
, MP_ACTION_RESPONSE
, sizeof(msg
.name
));
422 if (rte_mp_sendmsg(&msg
))
423 RTE_LOG(ERR
, EAL
, "Could not send message to secondary process\n");
425 TAILQ_REMOVE(&mp_request_list
.list
, entry
, next
);
426 free(entry
->alloc_state
.ms
);
428 } else if (entry
->user_req
.t
== REQ_TYPE_ALLOC
&&
429 result
== REQ_RESULT_FAIL
) {
430 struct rte_mp_msg rb_msg
;
431 struct malloc_mp_req
*rb
=
432 (struct malloc_mp_req
*)rb_msg
.param
;
434 struct primary_alloc_req_state
*state
=
438 memset(&rb_msg
, 0, sizeof(rb_msg
));
440 /* we've failed to sync, so do a rollback */
441 rollback_expand_heap(state
->ms
, state
->ms_len
, state
->elem
,
442 state
->map_addr
, state
->map_len
);
444 /* send rollback request */
446 rb_msg
.len_param
= sizeof(*rb
);
447 strlcpy(rb_msg
.name
, MP_ACTION_ROLLBACK
, sizeof(rb_msg
.name
));
450 ts
.tv_sec
= MP_TIMEOUT_S
;
452 /* sync requests carry no data */
453 rb
->t
= REQ_TYPE_SYNC
;
454 rb
->id
= entry
->user_req
.id
;
456 /* there may be stray timeout still waiting */
458 ret
= rte_mp_request_async(&rb_msg
, &ts
,
459 handle_rollback_response
);
460 } while (ret
!= 0 && rte_errno
== EEXIST
);
462 RTE_LOG(ERR
, EAL
, "Could not send rollback request to secondary process\n");
464 /* we couldn't send rollback request, but that's OK -
465 * secondary will time out, and memory has been removed
468 TAILQ_REMOVE(&mp_request_list
.list
, entry
, next
);
474 RTE_LOG(ERR
, EAL
, " to sync request of unknown type\n");
478 pthread_mutex_unlock(&mp_request_list
.lock
);
481 pthread_mutex_unlock(&mp_request_list
.lock
);
486 handle_rollback_response(const struct rte_mp_msg
*request
,
487 const struct rte_mp_reply
*reply __rte_unused
)
489 struct rte_mp_msg msg
;
490 struct malloc_mp_req
*resp
= (struct malloc_mp_req
*)msg
.param
;
491 const struct malloc_mp_req
*mpreq
=
492 (const struct malloc_mp_req
*)request
->param
;
493 struct mp_request
*entry
;
495 /* lock the request */
496 pthread_mutex_lock(&mp_request_list
.lock
);
498 memset(&msg
, 0, sizeof(0));
500 entry
= find_request_by_id(mpreq
->id
);
502 RTE_LOG(ERR
, EAL
, "Wrong request ID\n");
506 if (entry
->user_req
.t
!= REQ_TYPE_ALLOC
) {
507 RTE_LOG(ERR
, EAL
, "Unexpected active request\n");
511 /* we don't care if rollback succeeded, request still failed */
512 resp
->t
= REQ_TYPE_ALLOC
;
513 resp
->result
= REQ_RESULT_FAIL
;
514 resp
->id
= mpreq
->id
;
516 msg
.len_param
= sizeof(*resp
);
517 strlcpy(msg
.name
, MP_ACTION_RESPONSE
, sizeof(msg
.name
));
519 if (rte_mp_sendmsg(&msg
))
520 RTE_LOG(ERR
, EAL
, "Could not send message to secondary process\n");
523 TAILQ_REMOVE(&mp_request_list
.list
, entry
, next
);
524 free(entry
->alloc_state
.ms
);
527 pthread_mutex_unlock(&mp_request_list
.lock
);
530 pthread_mutex_unlock(&mp_request_list
.lock
);
534 /* final stage of the request from secondary */
536 handle_response(const struct rte_mp_msg
*msg
, const void *peer __rte_unused
)
538 const struct malloc_mp_req
*m
=
539 (const struct malloc_mp_req
*)msg
->param
;
540 struct mp_request
*entry
;
542 pthread_mutex_lock(&mp_request_list
.lock
);
544 entry
= find_request_by_id(m
->id
);
546 /* update request status */
547 entry
->user_req
.result
= m
->result
;
549 entry
->state
= REQ_STATE_COMPLETE
;
551 /* trigger thread wakeup */
552 pthread_cond_signal(&entry
->cond
);
555 pthread_mutex_unlock(&mp_request_list
.lock
);
560 /* synchronously request memory map sync, this is only called whenever primary
561 * process initiates the allocation.
566 struct rte_mp_msg msg
;
567 struct rte_mp_reply reply
;
568 struct malloc_mp_req
*req
= (struct malloc_mp_req
*)msg
.param
;
572 memset(&msg
, 0, sizeof(msg
));
573 memset(&reply
, 0, sizeof(reply
));
575 /* no need to create tailq entries as this is entirely synchronous */
578 msg
.len_param
= sizeof(*req
);
579 strlcpy(msg
.name
, MP_ACTION_SYNC
, sizeof(msg
.name
));
581 /* sync request carries no data */
582 req
->t
= REQ_TYPE_SYNC
;
583 req
->id
= get_unique_id();
586 ts
.tv_sec
= MP_TIMEOUT_S
;
588 /* there may be stray timeout still waiting */
590 ret
= rte_mp_request_sync(&msg
, &reply
, &ts
);
591 } while (ret
!= 0 && rte_errno
== EEXIST
);
593 RTE_LOG(ERR
, EAL
, "Could not send sync request to secondary process\n");
598 if (reply
.nb_received
!= reply
.nb_sent
) {
599 RTE_LOG(ERR
, EAL
, "Not all secondaries have responded\n");
604 for (i
= 0; i
< reply
.nb_received
; i
++) {
605 struct malloc_mp_req
*resp
=
606 (struct malloc_mp_req
*)reply
.msgs
[i
].param
;
607 if (resp
->t
!= REQ_TYPE_SYNC
) {
608 RTE_LOG(ERR
, EAL
, "Unexpected response from secondary\n");
612 if (resp
->id
!= req
->id
) {
613 RTE_LOG(ERR
, EAL
, "Wrong request ID\n");
617 if (resp
->result
!= REQ_RESULT_SUCCESS
) {
618 RTE_LOG(ERR
, EAL
, "Secondary process failed to synchronize\n");
630 /* this is a synchronous wrapper around a bunch of asynchronous requests to
631 * primary process. this will initiate a request and wait until responses come.
634 request_to_primary(struct malloc_mp_req
*user_req
)
636 struct rte_mp_msg msg
;
637 struct malloc_mp_req
*msg_req
= (struct malloc_mp_req
*)msg
.param
;
638 struct mp_request
*entry
;
643 memset(&msg
, 0, sizeof(msg
));
644 memset(&ts
, 0, sizeof(ts
));
646 pthread_mutex_lock(&mp_request_list
.lock
);
648 entry
= malloc(sizeof(*entry
));
650 RTE_LOG(ERR
, EAL
, "Cannot allocate memory for request\n");
654 memset(entry
, 0, sizeof(*entry
));
656 if (gettimeofday(&now
, NULL
) < 0) {
657 RTE_LOG(ERR
, EAL
, "Cannot get current time\n");
661 ts
.tv_nsec
= (now
.tv_usec
* 1000) % 1000000000;
662 ts
.tv_sec
= now
.tv_sec
+ MP_TIMEOUT_S
+
663 (now
.tv_usec
* 1000) / 1000000000;
665 /* initialize the request */
666 pthread_cond_init(&entry
->cond
, NULL
);
669 msg
.len_param
= sizeof(*msg_req
);
670 strlcpy(msg
.name
, MP_ACTION_REQUEST
, sizeof(msg
.name
));
672 /* (attempt to) get a unique id */
673 user_req
->id
= get_unique_id();
675 /* copy contents of user request into the message */
676 memcpy(msg_req
, user_req
, sizeof(*msg_req
));
678 if (rte_mp_sendmsg(&msg
)) {
679 RTE_LOG(ERR
, EAL
, "Cannot send message to primary\n");
683 /* copy contents of user request into active request */
684 memcpy(&entry
->user_req
, user_req
, sizeof(*user_req
));
686 /* mark request as in progress */
687 entry
->state
= REQ_STATE_ACTIVE
;
689 TAILQ_INSERT_TAIL(&mp_request_list
.list
, entry
, next
);
691 /* finally, wait on timeout */
693 ret
= pthread_cond_timedwait(&entry
->cond
,
694 &mp_request_list
.lock
, &ts
);
695 } while (ret
!= 0 && ret
!= ETIMEDOUT
);
697 if (entry
->state
!= REQ_STATE_COMPLETE
) {
698 RTE_LOG(ERR
, EAL
, "Request timed out\n");
702 user_req
->result
= entry
->user_req
.result
;
704 TAILQ_REMOVE(&mp_request_list
.list
, entry
, next
);
707 pthread_mutex_unlock(&mp_request_list
.lock
);
710 pthread_mutex_unlock(&mp_request_list
.lock
);
716 register_mp_requests(void)
718 if (rte_eal_process_type() == RTE_PROC_PRIMARY
) {
719 if (rte_mp_action_register(MP_ACTION_REQUEST
, handle_request
)) {
720 RTE_LOG(ERR
, EAL
, "Couldn't register '%s' action\n",
725 if (rte_mp_action_register(MP_ACTION_SYNC
, handle_sync
)) {
726 RTE_LOG(ERR
, EAL
, "Couldn't register '%s' action\n",
730 if (rte_mp_action_register(MP_ACTION_ROLLBACK
, handle_sync
)) {
731 RTE_LOG(ERR
, EAL
, "Couldn't register '%s' action\n",
735 if (rte_mp_action_register(MP_ACTION_RESPONSE
,
737 RTE_LOG(ERR
, EAL
, "Couldn't register '%s' action\n",