]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/dpdk/drivers/mempool/bucket/rte_mempool_bucket.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / dpdk / drivers / mempool / bucket / rte_mempool_bucket.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2 *
3 * Copyright (c) 2017-2018 Solarflare Communications Inc.
4 * All rights reserved.
5 *
6 * This software was jointly developed between OKTET Labs (under contract
7 * for Solarflare) and Solarflare Communications, Inc.
8 */
9
10 #include <stdbool.h>
11 #include <stdio.h>
12 #include <string.h>
13
14 #include <rte_errno.h>
15 #include <rte_ring.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
18
19 /*
20 * The general idea of the bucket mempool driver is as follows.
21 * We keep track of physically contiguous groups (buckets) of objects
22 * of a certain size. Every such a group has a counter that is
23 * incremented every time an object from that group is enqueued.
24 * Until the bucket is full, no objects from it are eligible for allocation.
25 * If a request is made to dequeue a multiply of bucket size, it is
26 * satisfied by returning the whole buckets, instead of separate objects.
27 */
28
29
30 struct bucket_header {
31 unsigned int lcore_id;
32 uint8_t fill_cnt;
33 };
34
35 struct bucket_stack {
36 unsigned int top;
37 unsigned int limit;
38 void *objects[];
39 };
40
41 struct bucket_data {
42 unsigned int header_size;
43 unsigned int total_elt_size;
44 unsigned int obj_per_bucket;
45 unsigned int bucket_stack_thresh;
46 uintptr_t bucket_page_mask;
47 struct rte_ring *shared_bucket_ring;
48 struct bucket_stack *buckets[RTE_MAX_LCORE];
49 /*
50 * Multi-producer single-consumer ring to hold objects that are
51 * returned to the mempool at a different lcore than initially
52 * dequeued
53 */
54 struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
55 struct rte_ring *shared_orphan_ring;
56 struct rte_mempool *pool;
57 unsigned int bucket_mem_size;
58 };
59
60 static struct bucket_stack *
61 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
62 {
63 struct bucket_stack *stack;
64
65 stack = rte_zmalloc_socket("bucket_stack",
66 sizeof(struct bucket_stack) +
67 n_elts * sizeof(void *),
68 RTE_CACHE_LINE_SIZE,
69 mp->socket_id);
70 if (stack == NULL)
71 return NULL;
72 stack->limit = n_elts;
73 stack->top = 0;
74
75 return stack;
76 }
77
78 static void
79 bucket_stack_push(struct bucket_stack *stack, void *obj)
80 {
81 RTE_ASSERT(stack->top < stack->limit);
82 stack->objects[stack->top++] = obj;
83 }
84
85 static void *
86 bucket_stack_pop_unsafe(struct bucket_stack *stack)
87 {
88 RTE_ASSERT(stack->top > 0);
89 return stack->objects[--stack->top];
90 }
91
92 static void *
93 bucket_stack_pop(struct bucket_stack *stack)
94 {
95 if (stack->top == 0)
96 return NULL;
97 return bucket_stack_pop_unsafe(stack);
98 }
99
100 static int
101 bucket_enqueue_single(struct bucket_data *bd, void *obj)
102 {
103 int rc = 0;
104 uintptr_t addr = (uintptr_t)obj;
105 struct bucket_header *hdr;
106 unsigned int lcore_id = rte_lcore_id();
107
108 addr &= bd->bucket_page_mask;
109 hdr = (struct bucket_header *)addr;
110
111 if (likely(hdr->lcore_id == lcore_id)) {
112 if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
113 hdr->fill_cnt++;
114 } else {
115 hdr->fill_cnt = 0;
116 /* Stack is big enough to put all buckets */
117 bucket_stack_push(bd->buckets[lcore_id], hdr);
118 }
119 } else if (hdr->lcore_id != LCORE_ID_ANY) {
120 struct rte_ring *adopt_ring =
121 bd->adoption_buffer_rings[hdr->lcore_id];
122
123 rc = rte_ring_enqueue(adopt_ring, obj);
124 /* Ring is big enough to put all objects */
125 RTE_ASSERT(rc == 0);
126 } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
127 hdr->fill_cnt++;
128 } else {
129 hdr->fill_cnt = 0;
130 rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
131 /* Ring is big enough to put all buckets */
132 RTE_ASSERT(rc == 0);
133 }
134
135 return rc;
136 }
137
138 static int
139 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
140 unsigned int n)
141 {
142 struct bucket_data *bd = mp->pool_data;
143 struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()];
144 unsigned int i;
145 int rc = 0;
146
147 for (i = 0; i < n; i++) {
148 rc = bucket_enqueue_single(bd, obj_table[i]);
149 RTE_ASSERT(rc == 0);
150 }
151 if (local_stack->top > bd->bucket_stack_thresh) {
152 rte_ring_enqueue_bulk(bd->shared_bucket_ring,
153 &local_stack->objects
154 [bd->bucket_stack_thresh],
155 local_stack->top -
156 bd->bucket_stack_thresh,
157 NULL);
158 local_stack->top = bd->bucket_stack_thresh;
159 }
160 return rc;
161 }
162
163 static void **
164 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
165 void **obj_table, unsigned int n)
166 {
167 unsigned int i;
168 uint8_t *objptr = *pstart;
169
170 for (objptr += bd->header_size, i = 0; i < n;
171 i++, objptr += bd->total_elt_size)
172 *obj_table++ = objptr;
173 *pstart = objptr;
174 return obj_table;
175 }
176
177 static int
178 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
179 unsigned int n_orphans)
180 {
181 unsigned int i;
182 int rc;
183 uint8_t *objptr;
184
185 rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
186 n_orphans, NULL);
187 if (unlikely(rc != (int)n_orphans)) {
188 struct bucket_header *hdr;
189
190 objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
191 hdr = (struct bucket_header *)objptr;
192
193 if (objptr == NULL) {
194 rc = rte_ring_dequeue(bd->shared_bucket_ring,
195 (void **)&objptr);
196 if (rc != 0) {
197 rte_errno = ENOBUFS;
198 return -rte_errno;
199 }
200 hdr = (struct bucket_header *)objptr;
201 hdr->lcore_id = rte_lcore_id();
202 }
203 hdr->fill_cnt = 0;
204 bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
205 n_orphans);
206 for (i = n_orphans; i < bd->obj_per_bucket; i++,
207 objptr += bd->total_elt_size) {
208 rc = rte_ring_enqueue(bd->shared_orphan_ring,
209 objptr);
210 if (rc != 0) {
211 RTE_ASSERT(0);
212 rte_errno = -rc;
213 return rc;
214 }
215 }
216 }
217
218 return 0;
219 }
220
221 static int
222 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
223 unsigned int n_buckets)
224 {
225 struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
226 unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
227 void **obj_table_base = obj_table;
228
229 n_buckets -= n_buckets_from_stack;
230 while (n_buckets_from_stack-- > 0) {
231 void *obj = bucket_stack_pop_unsafe(cur_stack);
232
233 obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
234 bd->obj_per_bucket);
235 }
236 while (n_buckets-- > 0) {
237 struct bucket_header *hdr;
238
239 if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
240 (void **)&hdr) != 0)) {
241 /*
242 * Return the already-dequeued buffers
243 * back to the mempool
244 */
245 bucket_enqueue(bd->pool, obj_table_base,
246 obj_table - obj_table_base);
247 rte_errno = ENOBUFS;
248 return -rte_errno;
249 }
250 hdr->lcore_id = rte_lcore_id();
251 obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
252 obj_table,
253 bd->obj_per_bucket);
254 }
255
256 return 0;
257 }
258
259 static int
260 bucket_adopt_orphans(struct bucket_data *bd)
261 {
262 int rc = 0;
263 struct rte_ring *adopt_ring =
264 bd->adoption_buffer_rings[rte_lcore_id()];
265
266 if (unlikely(!rte_ring_empty(adopt_ring))) {
267 void *orphan;
268
269 while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
270 rc = bucket_enqueue_single(bd, orphan);
271 RTE_ASSERT(rc == 0);
272 }
273 }
274 return rc;
275 }
276
277 static int
278 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
279 {
280 struct bucket_data *bd = mp->pool_data;
281 unsigned int n_buckets = n / bd->obj_per_bucket;
282 unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
283 int rc = 0;
284
285 bucket_adopt_orphans(bd);
286
287 if (unlikely(n_orphans > 0)) {
288 rc = bucket_dequeue_orphans(bd, obj_table +
289 (n_buckets * bd->obj_per_bucket),
290 n_orphans);
291 if (rc != 0)
292 return rc;
293 }
294
295 if (likely(n_buckets > 0)) {
296 rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
297 if (unlikely(rc != 0) && n_orphans > 0) {
298 rte_ring_enqueue_bulk(bd->shared_orphan_ring,
299 obj_table + (n_buckets *
300 bd->obj_per_bucket),
301 n_orphans, NULL);
302 }
303 }
304
305 return rc;
306 }
307
308 static int
309 bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
310 unsigned int n)
311 {
312 struct bucket_data *bd = mp->pool_data;
313 const uint32_t header_size = bd->header_size;
314 struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
315 unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
316 struct bucket_header *hdr;
317 void **first_objp = first_obj_table;
318
319 bucket_adopt_orphans(bd);
320
321 n -= n_buckets_from_stack;
322 while (n_buckets_from_stack-- > 0) {
323 hdr = bucket_stack_pop_unsafe(cur_stack);
324 *first_objp++ = (uint8_t *)hdr + header_size;
325 }
326 if (n > 0) {
327 if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
328 first_objp, n, NULL) != n)) {
329 /* Return the already dequeued buckets */
330 while (first_objp-- != first_obj_table) {
331 bucket_stack_push(cur_stack,
332 (uint8_t *)*first_objp -
333 header_size);
334 }
335 rte_errno = ENOBUFS;
336 return -rte_errno;
337 }
338 while (n-- > 0) {
339 hdr = (struct bucket_header *)*first_objp;
340 hdr->lcore_id = rte_lcore_id();
341 *first_objp++ = (uint8_t *)hdr + header_size;
342 }
343 }
344
345 return 0;
346 }
347
348 static void
349 count_underfilled_buckets(struct rte_mempool *mp,
350 void *opaque,
351 struct rte_mempool_memhdr *memhdr,
352 __rte_unused unsigned int mem_idx)
353 {
354 unsigned int *pcount = opaque;
355 const struct bucket_data *bd = mp->pool_data;
356 unsigned int bucket_page_sz =
357 (unsigned int)(~bd->bucket_page_mask + 1);
358 uintptr_t align;
359 uint8_t *iter;
360
361 align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
362 (uintptr_t)memhdr->addr;
363
364 for (iter = (uint8_t *)memhdr->addr + align;
365 iter < (uint8_t *)memhdr->addr + memhdr->len;
366 iter += bucket_page_sz) {
367 struct bucket_header *hdr = (struct bucket_header *)iter;
368
369 *pcount += hdr->fill_cnt;
370 }
371 }
372
373 static unsigned int
374 bucket_get_count(const struct rte_mempool *mp)
375 {
376 const struct bucket_data *bd = mp->pool_data;
377 unsigned int count =
378 bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
379 rte_ring_count(bd->shared_orphan_ring);
380 unsigned int i;
381
382 for (i = 0; i < RTE_MAX_LCORE; i++) {
383 if (!rte_lcore_is_enabled(i))
384 continue;
385 count += bd->obj_per_bucket * bd->buckets[i]->top +
386 rte_ring_count(bd->adoption_buffer_rings[i]);
387 }
388
389 rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
390 count_underfilled_buckets, &count);
391
392 return count;
393 }
394
395 static int
396 bucket_alloc(struct rte_mempool *mp)
397 {
398 int rg_flags = 0;
399 int rc = 0;
400 char rg_name[RTE_RING_NAMESIZE];
401 struct bucket_data *bd;
402 unsigned int i;
403 unsigned int bucket_header_size;
404
405 bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
406 RTE_CACHE_LINE_SIZE, mp->socket_id);
407 if (bd == NULL) {
408 rc = -ENOMEM;
409 goto no_mem_for_data;
410 }
411 bd->pool = mp;
412 if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
413 bucket_header_size = sizeof(struct bucket_header);
414 else
415 bucket_header_size = RTE_CACHE_LINE_SIZE;
416 RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
417 bd->header_size = mp->header_size + bucket_header_size;
418 bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
419 bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
420 bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
421 bd->total_elt_size;
422 bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
423 /* eventually this should be a tunable parameter */
424 bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
425
426 if (mp->flags & MEMPOOL_F_SP_PUT)
427 rg_flags |= RING_F_SP_ENQ;
428 if (mp->flags & MEMPOOL_F_SC_GET)
429 rg_flags |= RING_F_SC_DEQ;
430
431 for (i = 0; i < RTE_MAX_LCORE; i++) {
432 if (!rte_lcore_is_enabled(i))
433 continue;
434 bd->buckets[i] =
435 bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
436 if (bd->buckets[i] == NULL) {
437 rc = -ENOMEM;
438 goto no_mem_for_stacks;
439 }
440 rc = snprintf(rg_name, sizeof(rg_name),
441 RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
442 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
443 rc = -ENAMETOOLONG;
444 goto no_mem_for_stacks;
445 }
446 bd->adoption_buffer_rings[i] =
447 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
448 mp->socket_id,
449 rg_flags | RING_F_SC_DEQ);
450 if (bd->adoption_buffer_rings[i] == NULL) {
451 rc = -rte_errno;
452 goto no_mem_for_stacks;
453 }
454 }
455
456 rc = snprintf(rg_name, sizeof(rg_name),
457 RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
458 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
459 rc = -ENAMETOOLONG;
460 goto invalid_shared_orphan_ring;
461 }
462 bd->shared_orphan_ring =
463 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
464 mp->socket_id, rg_flags);
465 if (bd->shared_orphan_ring == NULL) {
466 rc = -rte_errno;
467 goto cannot_create_shared_orphan_ring;
468 }
469
470 rc = snprintf(rg_name, sizeof(rg_name),
471 RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
472 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
473 rc = -ENAMETOOLONG;
474 goto invalid_shared_bucket_ring;
475 }
476 bd->shared_bucket_ring =
477 rte_ring_create(rg_name,
478 rte_align32pow2((mp->size + 1) /
479 bd->obj_per_bucket),
480 mp->socket_id, rg_flags);
481 if (bd->shared_bucket_ring == NULL) {
482 rc = -rte_errno;
483 goto cannot_create_shared_bucket_ring;
484 }
485
486 mp->pool_data = bd;
487
488 return 0;
489
490 cannot_create_shared_bucket_ring:
491 invalid_shared_bucket_ring:
492 rte_ring_free(bd->shared_orphan_ring);
493 cannot_create_shared_orphan_ring:
494 invalid_shared_orphan_ring:
495 no_mem_for_stacks:
496 for (i = 0; i < RTE_MAX_LCORE; i++) {
497 rte_free(bd->buckets[i]);
498 rte_ring_free(bd->adoption_buffer_rings[i]);
499 }
500 rte_free(bd);
501 no_mem_for_data:
502 rte_errno = -rc;
503 return rc;
504 }
505
506 static void
507 bucket_free(struct rte_mempool *mp)
508 {
509 unsigned int i;
510 struct bucket_data *bd = mp->pool_data;
511
512 if (bd == NULL)
513 return;
514
515 for (i = 0; i < RTE_MAX_LCORE; i++) {
516 rte_free(bd->buckets[i]);
517 rte_ring_free(bd->adoption_buffer_rings[i]);
518 }
519
520 rte_ring_free(bd->shared_orphan_ring);
521 rte_ring_free(bd->shared_bucket_ring);
522
523 rte_free(bd);
524 }
525
526 static ssize_t
527 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
528 __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
529 size_t *align)
530 {
531 struct bucket_data *bd = mp->pool_data;
532 unsigned int bucket_page_sz;
533
534 if (bd == NULL)
535 return -EINVAL;
536
537 bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
538 *align = bucket_page_sz;
539 *min_total_elt_size = bucket_page_sz;
540 /*
541 * Each bucket occupies its own block aligned to
542 * bucket_page_sz, so the required amount of memory is
543 * a multiple of bucket_page_sz.
544 * We also need extra space for a bucket header
545 */
546 return ((obj_num + bd->obj_per_bucket - 1) /
547 bd->obj_per_bucket) * bucket_page_sz;
548 }
549
550 static int
551 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
552 void *vaddr, rte_iova_t iova, size_t len,
553 rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
554 {
555 struct bucket_data *bd = mp->pool_data;
556 unsigned int bucket_page_sz;
557 unsigned int bucket_header_sz;
558 unsigned int n_objs;
559 uintptr_t align;
560 uint8_t *iter;
561 int rc;
562
563 if (bd == NULL)
564 return -EINVAL;
565
566 bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
567 align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
568 (uintptr_t)vaddr;
569
570 bucket_header_sz = bd->header_size - mp->header_size;
571 if (iova != RTE_BAD_IOVA)
572 iova += align + bucket_header_sz;
573
574 for (iter = (uint8_t *)vaddr + align, n_objs = 0;
575 iter < (uint8_t *)vaddr + len && n_objs < max_objs;
576 iter += bucket_page_sz) {
577 struct bucket_header *hdr = (struct bucket_header *)iter;
578 unsigned int chunk_len = bd->bucket_mem_size;
579
580 if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
581 chunk_len = len - (iter - (uint8_t *)vaddr);
582 if (chunk_len <= bucket_header_sz)
583 break;
584 chunk_len -= bucket_header_sz;
585
586 hdr->fill_cnt = 0;
587 hdr->lcore_id = LCORE_ID_ANY;
588 rc = rte_mempool_op_populate_default(mp,
589 RTE_MIN(bd->obj_per_bucket,
590 max_objs - n_objs),
591 iter + bucket_header_sz,
592 iova, chunk_len,
593 obj_cb, obj_cb_arg);
594 if (rc < 0)
595 return rc;
596 n_objs += rc;
597 if (iova != RTE_BAD_IOVA)
598 iova += bucket_page_sz;
599 }
600
601 return n_objs;
602 }
603
604 static int
605 bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
606 {
607 struct bucket_data *bd = mp->pool_data;
608
609 info->contig_block_size = bd->obj_per_bucket;
610 return 0;
611 }
612
613
614 static const struct rte_mempool_ops ops_bucket = {
615 .name = "bucket",
616 .alloc = bucket_alloc,
617 .free = bucket_free,
618 .enqueue = bucket_enqueue,
619 .dequeue = bucket_dequeue,
620 .get_count = bucket_get_count,
621 .calc_mem_size = bucket_calc_mem_size,
622 .populate = bucket_populate,
623 .get_info = bucket_get_info,
624 .dequeue_contig_blocks = bucket_dequeue_contig_blocks,
625 };
626
627
628 MEMPOOL_REGISTER_OPS(ops_bucket);