1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2017-2018 Solarflare Communications Inc.
6 * This software was jointly developed between OKTET Labs (under contract
7 * for Solarflare) and Solarflare Communications, Inc.
14 #include <rte_errno.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
20 * The general idea of the bucket mempool driver is as follows.
21 * We keep track of physically contiguous groups (buckets) of objects
22 * of a certain size. Every such a group has a counter that is
23 * incremented every time an object from that group is enqueued.
24 * Until the bucket is full, no objects from it are eligible for allocation.
25 * If a request is made to dequeue a multiply of bucket size, it is
26 * satisfied by returning the whole buckets, instead of separate objects.
30 struct bucket_header
{
31 unsigned int lcore_id
;
42 unsigned int header_size
;
43 unsigned int total_elt_size
;
44 unsigned int obj_per_bucket
;
45 unsigned int bucket_stack_thresh
;
46 uintptr_t bucket_page_mask
;
47 struct rte_ring
*shared_bucket_ring
;
48 struct bucket_stack
*buckets
[RTE_MAX_LCORE
];
50 * Multi-producer single-consumer ring to hold objects that are
51 * returned to the mempool at a different lcore than initially
54 struct rte_ring
*adoption_buffer_rings
[RTE_MAX_LCORE
];
55 struct rte_ring
*shared_orphan_ring
;
56 struct rte_mempool
*pool
;
57 unsigned int bucket_mem_size
;
60 static struct bucket_stack
*
61 bucket_stack_create(const struct rte_mempool
*mp
, unsigned int n_elts
)
63 struct bucket_stack
*stack
;
65 stack
= rte_zmalloc_socket("bucket_stack",
66 sizeof(struct bucket_stack
) +
67 n_elts
* sizeof(void *),
72 stack
->limit
= n_elts
;
79 bucket_stack_push(struct bucket_stack
*stack
, void *obj
)
81 RTE_ASSERT(stack
->top
< stack
->limit
);
82 stack
->objects
[stack
->top
++] = obj
;
86 bucket_stack_pop_unsafe(struct bucket_stack
*stack
)
88 RTE_ASSERT(stack
->top
> 0);
89 return stack
->objects
[--stack
->top
];
93 bucket_stack_pop(struct bucket_stack
*stack
)
97 return bucket_stack_pop_unsafe(stack
);
101 bucket_enqueue_single(struct bucket_data
*bd
, void *obj
)
104 uintptr_t addr
= (uintptr_t)obj
;
105 struct bucket_header
*hdr
;
106 unsigned int lcore_id
= rte_lcore_id();
108 addr
&= bd
->bucket_page_mask
;
109 hdr
= (struct bucket_header
*)addr
;
111 if (likely(hdr
->lcore_id
== lcore_id
)) {
112 if (hdr
->fill_cnt
< bd
->obj_per_bucket
- 1) {
116 /* Stack is big enough to put all buckets */
117 bucket_stack_push(bd
->buckets
[lcore_id
], hdr
);
119 } else if (hdr
->lcore_id
!= LCORE_ID_ANY
) {
120 struct rte_ring
*adopt_ring
=
121 bd
->adoption_buffer_rings
[hdr
->lcore_id
];
123 rc
= rte_ring_enqueue(adopt_ring
, obj
);
124 /* Ring is big enough to put all objects */
126 } else if (hdr
->fill_cnt
< bd
->obj_per_bucket
- 1) {
130 rc
= rte_ring_enqueue(bd
->shared_bucket_ring
, hdr
);
131 /* Ring is big enough to put all buckets */
139 bucket_enqueue(struct rte_mempool
*mp
, void * const *obj_table
,
142 struct bucket_data
*bd
= mp
->pool_data
;
143 struct bucket_stack
*local_stack
= bd
->buckets
[rte_lcore_id()];
147 for (i
= 0; i
< n
; i
++) {
148 rc
= bucket_enqueue_single(bd
, obj_table
[i
]);
151 if (local_stack
->top
> bd
->bucket_stack_thresh
) {
152 rte_ring_enqueue_bulk(bd
->shared_bucket_ring
,
153 &local_stack
->objects
154 [bd
->bucket_stack_thresh
],
156 bd
->bucket_stack_thresh
,
158 local_stack
->top
= bd
->bucket_stack_thresh
;
164 bucket_fill_obj_table(const struct bucket_data
*bd
, void **pstart
,
165 void **obj_table
, unsigned int n
)
168 uint8_t *objptr
= *pstart
;
170 for (objptr
+= bd
->header_size
, i
= 0; i
< n
;
171 i
++, objptr
+= bd
->total_elt_size
)
172 *obj_table
++ = objptr
;
178 bucket_dequeue_orphans(struct bucket_data
*bd
, void **obj_table
,
179 unsigned int n_orphans
)
185 rc
= rte_ring_dequeue_bulk(bd
->shared_orphan_ring
, obj_table
,
187 if (unlikely(rc
!= (int)n_orphans
)) {
188 struct bucket_header
*hdr
;
190 objptr
= bucket_stack_pop(bd
->buckets
[rte_lcore_id()]);
191 hdr
= (struct bucket_header
*)objptr
;
193 if (objptr
== NULL
) {
194 rc
= rte_ring_dequeue(bd
->shared_bucket_ring
,
200 hdr
= (struct bucket_header
*)objptr
;
201 hdr
->lcore_id
= rte_lcore_id();
204 bucket_fill_obj_table(bd
, (void **)&objptr
, obj_table
,
206 for (i
= n_orphans
; i
< bd
->obj_per_bucket
; i
++,
207 objptr
+= bd
->total_elt_size
) {
208 rc
= rte_ring_enqueue(bd
->shared_orphan_ring
,
222 bucket_dequeue_buckets(struct bucket_data
*bd
, void **obj_table
,
223 unsigned int n_buckets
)
225 struct bucket_stack
*cur_stack
= bd
->buckets
[rte_lcore_id()];
226 unsigned int n_buckets_from_stack
= RTE_MIN(n_buckets
, cur_stack
->top
);
227 void **obj_table_base
= obj_table
;
229 n_buckets
-= n_buckets_from_stack
;
230 while (n_buckets_from_stack
-- > 0) {
231 void *obj
= bucket_stack_pop_unsafe(cur_stack
);
233 obj_table
= bucket_fill_obj_table(bd
, &obj
, obj_table
,
236 while (n_buckets
-- > 0) {
237 struct bucket_header
*hdr
;
239 if (unlikely(rte_ring_dequeue(bd
->shared_bucket_ring
,
240 (void **)&hdr
) != 0)) {
242 * Return the already-dequeued buffers
243 * back to the mempool
245 bucket_enqueue(bd
->pool
, obj_table_base
,
246 obj_table
- obj_table_base
);
250 hdr
->lcore_id
= rte_lcore_id();
251 obj_table
= bucket_fill_obj_table(bd
, (void **)&hdr
,
260 bucket_adopt_orphans(struct bucket_data
*bd
)
263 struct rte_ring
*adopt_ring
=
264 bd
->adoption_buffer_rings
[rte_lcore_id()];
266 if (unlikely(!rte_ring_empty(adopt_ring
))) {
269 while (rte_ring_sc_dequeue(adopt_ring
, &orphan
) == 0) {
270 rc
= bucket_enqueue_single(bd
, orphan
);
278 bucket_dequeue(struct rte_mempool
*mp
, void **obj_table
, unsigned int n
)
280 struct bucket_data
*bd
= mp
->pool_data
;
281 unsigned int n_buckets
= n
/ bd
->obj_per_bucket
;
282 unsigned int n_orphans
= n
- n_buckets
* bd
->obj_per_bucket
;
285 bucket_adopt_orphans(bd
);
287 if (unlikely(n_orphans
> 0)) {
288 rc
= bucket_dequeue_orphans(bd
, obj_table
+
289 (n_buckets
* bd
->obj_per_bucket
),
295 if (likely(n_buckets
> 0)) {
296 rc
= bucket_dequeue_buckets(bd
, obj_table
, n_buckets
);
297 if (unlikely(rc
!= 0) && n_orphans
> 0) {
298 rte_ring_enqueue_bulk(bd
->shared_orphan_ring
,
299 obj_table
+ (n_buckets
*
309 bucket_dequeue_contig_blocks(struct rte_mempool
*mp
, void **first_obj_table
,
312 struct bucket_data
*bd
= mp
->pool_data
;
313 const uint32_t header_size
= bd
->header_size
;
314 struct bucket_stack
*cur_stack
= bd
->buckets
[rte_lcore_id()];
315 unsigned int n_buckets_from_stack
= RTE_MIN(n
, cur_stack
->top
);
316 struct bucket_header
*hdr
;
317 void **first_objp
= first_obj_table
;
319 bucket_adopt_orphans(bd
);
321 n
-= n_buckets_from_stack
;
322 while (n_buckets_from_stack
-- > 0) {
323 hdr
= bucket_stack_pop_unsafe(cur_stack
);
324 *first_objp
++ = (uint8_t *)hdr
+ header_size
;
327 if (unlikely(rte_ring_dequeue_bulk(bd
->shared_bucket_ring
,
328 first_objp
, n
, NULL
) != n
)) {
329 /* Return the already dequeued buckets */
330 while (first_objp
-- != first_obj_table
) {
331 bucket_stack_push(cur_stack
,
332 (uint8_t *)*first_objp
-
339 hdr
= (struct bucket_header
*)*first_objp
;
340 hdr
->lcore_id
= rte_lcore_id();
341 *first_objp
++ = (uint8_t *)hdr
+ header_size
;
349 count_underfilled_buckets(struct rte_mempool
*mp
,
351 struct rte_mempool_memhdr
*memhdr
,
352 __rte_unused
unsigned int mem_idx
)
354 unsigned int *pcount
= opaque
;
355 const struct bucket_data
*bd
= mp
->pool_data
;
356 unsigned int bucket_page_sz
=
357 (unsigned int)(~bd
->bucket_page_mask
+ 1);
361 align
= (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr
->addr
, bucket_page_sz
) -
362 (uintptr_t)memhdr
->addr
;
364 for (iter
= (uint8_t *)memhdr
->addr
+ align
;
365 iter
< (uint8_t *)memhdr
->addr
+ memhdr
->len
;
366 iter
+= bucket_page_sz
) {
367 struct bucket_header
*hdr
= (struct bucket_header
*)iter
;
369 *pcount
+= hdr
->fill_cnt
;
374 bucket_get_count(const struct rte_mempool
*mp
)
376 const struct bucket_data
*bd
= mp
->pool_data
;
378 bd
->obj_per_bucket
* rte_ring_count(bd
->shared_bucket_ring
) +
379 rte_ring_count(bd
->shared_orphan_ring
);
382 for (i
= 0; i
< RTE_MAX_LCORE
; i
++) {
383 if (!rte_lcore_is_enabled(i
))
385 count
+= bd
->obj_per_bucket
* bd
->buckets
[i
]->top
+
386 rte_ring_count(bd
->adoption_buffer_rings
[i
]);
389 rte_mempool_mem_iter((struct rte_mempool
*)(uintptr_t)mp
,
390 count_underfilled_buckets
, &count
);
396 bucket_alloc(struct rte_mempool
*mp
)
400 char rg_name
[RTE_RING_NAMESIZE
];
401 struct bucket_data
*bd
;
403 unsigned int bucket_header_size
;
405 bd
= rte_zmalloc_socket("bucket_pool", sizeof(*bd
),
406 RTE_CACHE_LINE_SIZE
, mp
->socket_id
);
409 goto no_mem_for_data
;
412 if (mp
->flags
& MEMPOOL_F_NO_CACHE_ALIGN
)
413 bucket_header_size
= sizeof(struct bucket_header
);
415 bucket_header_size
= RTE_CACHE_LINE_SIZE
;
416 RTE_BUILD_BUG_ON(sizeof(struct bucket_header
) > RTE_CACHE_LINE_SIZE
);
417 bd
->header_size
= mp
->header_size
+ bucket_header_size
;
418 bd
->total_elt_size
= mp
->header_size
+ mp
->elt_size
+ mp
->trailer_size
;
419 bd
->bucket_mem_size
= RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB
* 1024;
420 bd
->obj_per_bucket
= (bd
->bucket_mem_size
- bucket_header_size
) /
422 bd
->bucket_page_mask
= ~(rte_align64pow2(bd
->bucket_mem_size
) - 1);
423 /* eventually this should be a tunable parameter */
424 bd
->bucket_stack_thresh
= (mp
->size
/ bd
->obj_per_bucket
) * 4 / 3;
426 if (mp
->flags
& MEMPOOL_F_SP_PUT
)
427 rg_flags
|= RING_F_SP_ENQ
;
428 if (mp
->flags
& MEMPOOL_F_SC_GET
)
429 rg_flags
|= RING_F_SC_DEQ
;
431 for (i
= 0; i
< RTE_MAX_LCORE
; i
++) {
432 if (!rte_lcore_is_enabled(i
))
435 bucket_stack_create(mp
, mp
->size
/ bd
->obj_per_bucket
);
436 if (bd
->buckets
[i
] == NULL
) {
438 goto no_mem_for_stacks
;
440 rc
= snprintf(rg_name
, sizeof(rg_name
),
441 RTE_MEMPOOL_MZ_FORMAT
".a%u", mp
->name
, i
);
442 if (rc
< 0 || rc
>= (int)sizeof(rg_name
)) {
444 goto no_mem_for_stacks
;
446 bd
->adoption_buffer_rings
[i
] =
447 rte_ring_create(rg_name
, rte_align32pow2(mp
->size
+ 1),
449 rg_flags
| RING_F_SC_DEQ
);
450 if (bd
->adoption_buffer_rings
[i
] == NULL
) {
452 goto no_mem_for_stacks
;
456 rc
= snprintf(rg_name
, sizeof(rg_name
),
457 RTE_MEMPOOL_MZ_FORMAT
".0", mp
->name
);
458 if (rc
< 0 || rc
>= (int)sizeof(rg_name
)) {
460 goto invalid_shared_orphan_ring
;
462 bd
->shared_orphan_ring
=
463 rte_ring_create(rg_name
, rte_align32pow2(mp
->size
+ 1),
464 mp
->socket_id
, rg_flags
);
465 if (bd
->shared_orphan_ring
== NULL
) {
467 goto cannot_create_shared_orphan_ring
;
470 rc
= snprintf(rg_name
, sizeof(rg_name
),
471 RTE_MEMPOOL_MZ_FORMAT
".1", mp
->name
);
472 if (rc
< 0 || rc
>= (int)sizeof(rg_name
)) {
474 goto invalid_shared_bucket_ring
;
476 bd
->shared_bucket_ring
=
477 rte_ring_create(rg_name
,
478 rte_align32pow2((mp
->size
+ 1) /
480 mp
->socket_id
, rg_flags
);
481 if (bd
->shared_bucket_ring
== NULL
) {
483 goto cannot_create_shared_bucket_ring
;
490 cannot_create_shared_bucket_ring
:
491 invalid_shared_bucket_ring
:
492 rte_ring_free(bd
->shared_orphan_ring
);
493 cannot_create_shared_orphan_ring
:
494 invalid_shared_orphan_ring
:
496 for (i
= 0; i
< RTE_MAX_LCORE
; i
++) {
497 rte_free(bd
->buckets
[i
]);
498 rte_ring_free(bd
->adoption_buffer_rings
[i
]);
507 bucket_free(struct rte_mempool
*mp
)
510 struct bucket_data
*bd
= mp
->pool_data
;
515 for (i
= 0; i
< RTE_MAX_LCORE
; i
++) {
516 rte_free(bd
->buckets
[i
]);
517 rte_ring_free(bd
->adoption_buffer_rings
[i
]);
520 rte_ring_free(bd
->shared_orphan_ring
);
521 rte_ring_free(bd
->shared_bucket_ring
);
527 bucket_calc_mem_size(const struct rte_mempool
*mp
, uint32_t obj_num
,
528 __rte_unused
uint32_t pg_shift
, size_t *min_total_elt_size
,
531 struct bucket_data
*bd
= mp
->pool_data
;
532 unsigned int bucket_page_sz
;
537 bucket_page_sz
= rte_align32pow2(bd
->bucket_mem_size
);
538 *align
= bucket_page_sz
;
539 *min_total_elt_size
= bucket_page_sz
;
541 * Each bucket occupies its own block aligned to
542 * bucket_page_sz, so the required amount of memory is
543 * a multiple of bucket_page_sz.
544 * We also need extra space for a bucket header
546 return ((obj_num
+ bd
->obj_per_bucket
- 1) /
547 bd
->obj_per_bucket
) * bucket_page_sz
;
551 bucket_populate(struct rte_mempool
*mp
, unsigned int max_objs
,
552 void *vaddr
, rte_iova_t iova
, size_t len
,
553 rte_mempool_populate_obj_cb_t
*obj_cb
, void *obj_cb_arg
)
555 struct bucket_data
*bd
= mp
->pool_data
;
556 unsigned int bucket_page_sz
;
557 unsigned int bucket_header_sz
;
566 bucket_page_sz
= rte_align32pow2(bd
->bucket_mem_size
);
567 align
= RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr
, bucket_page_sz
) -
570 bucket_header_sz
= bd
->header_size
- mp
->header_size
;
571 if (iova
!= RTE_BAD_IOVA
)
572 iova
+= align
+ bucket_header_sz
;
574 for (iter
= (uint8_t *)vaddr
+ align
, n_objs
= 0;
575 iter
< (uint8_t *)vaddr
+ len
&& n_objs
< max_objs
;
576 iter
+= bucket_page_sz
) {
577 struct bucket_header
*hdr
= (struct bucket_header
*)iter
;
578 unsigned int chunk_len
= bd
->bucket_mem_size
;
580 if ((size_t)(iter
- (uint8_t *)vaddr
) + chunk_len
> len
)
581 chunk_len
= len
- (iter
- (uint8_t *)vaddr
);
582 if (chunk_len
<= bucket_header_sz
)
584 chunk_len
-= bucket_header_sz
;
587 hdr
->lcore_id
= LCORE_ID_ANY
;
588 rc
= rte_mempool_op_populate_default(mp
,
589 RTE_MIN(bd
->obj_per_bucket
,
591 iter
+ bucket_header_sz
,
597 if (iova
!= RTE_BAD_IOVA
)
598 iova
+= bucket_page_sz
;
605 bucket_get_info(const struct rte_mempool
*mp
, struct rte_mempool_info
*info
)
607 struct bucket_data
*bd
= mp
->pool_data
;
609 info
->contig_block_size
= bd
->obj_per_bucket
;
614 static const struct rte_mempool_ops ops_bucket
= {
616 .alloc
= bucket_alloc
,
618 .enqueue
= bucket_enqueue
,
619 .dequeue
= bucket_dequeue
,
620 .get_count
= bucket_get_count
,
621 .calc_mem_size
= bucket_calc_mem_size
,
622 .populate
= bucket_populate
,
623 .get_info
= bucket_get_info
,
624 .dequeue_contig_blocks
= bucket_dequeue_contig_blocks
,
628 MEMPOOL_REGISTER_OPS(ops_bucket
);