]>
git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/ftl/ftl_rwb.c
4 * Copyright (c) Intel Corporation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk/stdinc.h"
36 #include "spdk/util.h"
41 struct ftl_rwb_batch
{
45 /* Position within RWB */
48 /* Number of acquired entries */
49 unsigned int num_acquired
;
51 /* Number of entries ready for submission */
52 unsigned int num_ready
;
55 LIST_HEAD(, ftl_rwb_entry
) entry_list
;
58 struct ftl_rwb_entry
*entries
;
67 STAILQ_ENTRY(ftl_rwb_batch
) stailq
;
71 /* Number of batches */
74 /* Information for interleaving */
75 size_t interleave_offset
;
76 /* Maximum number of active batches */
77 size_t max_active_batches
;
79 /* Number of entries per batch */
84 /* Number of acquired entries */
85 unsigned int num_acquired
[FTL_RWB_TYPE_MAX
];
86 /* User/internal limits */
87 size_t limits
[FTL_RWB_TYPE_MAX
];
89 /* Active batch queue */
90 STAILQ_HEAD(, ftl_rwb_batch
) active_queue
;
91 /* Number of active batches */
92 unsigned int num_active_batches
;
94 /* Free batch queue */
95 STAILQ_HEAD(, ftl_rwb_batch
) free_queue
;
96 /* Number of active batches */
97 unsigned int num_free_batches
;
99 /* Submission batch queue */
100 struct spdk_ring
*submit_queue
;
101 /* High-priority batch queue */
102 struct spdk_ring
*prio_queue
;
105 struct ftl_rwb_batch
*batches
;
108 pthread_spinlock_t lock
;
112 ftl_rwb_batch_full(const struct ftl_rwb_batch
*batch
, size_t batch_size
)
114 struct ftl_rwb
*rwb
= batch
->rwb
;
115 assert(batch_size
<= rwb
->xfer_size
);
116 return batch_size
== rwb
->xfer_size
;
120 ftl_rwb_batch_init_entry(struct ftl_rwb_batch
*batch
, size_t pos
)
122 struct ftl_rwb
*rwb
= batch
->rwb
;
123 struct ftl_rwb_entry
*entry
, *prev
;
124 size_t batch_offset
= pos
% rwb
->xfer_size
;
126 entry
= &batch
->entries
[batch_offset
];
128 entry
->data
= ((char *)batch
->buffer
) + FTL_BLOCK_SIZE
* batch_offset
;
129 entry
->md
= rwb
->md_size
? ((char *)batch
->md_buffer
) + rwb
->md_size
* batch_offset
: NULL
;
130 entry
->batch
= batch
;
131 entry
->rwb
= batch
->rwb
;
133 if (pthread_spin_init(&entry
->lock
, PTHREAD_PROCESS_PRIVATE
)) {
134 SPDK_ERRLOG("Spinlock initialization failure\n");
138 if (batch_offset
> 0) {
139 prev
= &batch
->entries
[batch_offset
- 1];
140 LIST_INSERT_AFTER(prev
, entry
, list_entry
);
142 LIST_INSERT_HEAD(&batch
->entry_list
, entry
, list_entry
);
149 ftl_rwb_batch_init(struct ftl_rwb
*rwb
, struct ftl_rwb_batch
*batch
, unsigned int pos
)
153 md_size
= rwb
->md_size
* rwb
->xfer_size
;
157 batch
->entries
= calloc(rwb
->xfer_size
, sizeof(*batch
->entries
));
158 if (!batch
->entries
) {
162 LIST_INIT(&batch
->entry_list
);
164 batch
->buffer
= spdk_dma_zmalloc(FTL_BLOCK_SIZE
* rwb
->xfer_size
,
165 FTL_BLOCK_SIZE
, NULL
);
166 if (!batch
->buffer
) {
171 batch
->md_buffer
= spdk_dma_zmalloc(md_size
, 0, NULL
);
172 if (!batch
->md_buffer
) {
177 for (i
= 0; i
< rwb
->xfer_size
; ++i
) {
178 if (ftl_rwb_batch_init_entry(batch
, pos
* rwb
->xfer_size
+ i
)) {
187 ftl_rwb_init(const struct spdk_ftl_conf
*conf
, size_t xfer_size
, size_t md_size
, size_t num_punits
)
189 struct ftl_rwb
*rwb
= NULL
;
190 struct ftl_rwb_batch
*batch
;
193 rwb
= calloc(1, sizeof(*rwb
));
195 SPDK_ERRLOG("Memory allocation failure\n");
199 if (pthread_spin_init(&rwb
->lock
, PTHREAD_PROCESS_PRIVATE
)) {
200 SPDK_ERRLOG("Spinlock initialization failure\n");
205 assert(conf
->rwb_size
% xfer_size
== 0);
206 rwb
->xfer_size
= xfer_size
;
207 rwb
->interleave_offset
= xfer_size
/ conf
->num_interleave_units
;
208 rwb
->max_active_batches
= conf
->num_interleave_units
== 1 ? 1 : num_punits
;
209 rwb
->md_size
= md_size
;
210 rwb
->num_batches
= conf
->rwb_size
/ (FTL_BLOCK_SIZE
* xfer_size
) + rwb
->max_active_batches
;
212 rwb
->batches
= calloc(rwb
->num_batches
, sizeof(*rwb
->batches
));
217 rwb
->submit_queue
= spdk_ring_create(SPDK_RING_TYPE_MP_SC
,
218 spdk_align32pow2(rwb
->num_batches
+ 1),
219 SPDK_ENV_SOCKET_ID_ANY
);
220 if (!rwb
->submit_queue
) {
221 SPDK_ERRLOG("Failed to create submission queue\n");
225 rwb
->prio_queue
= spdk_ring_create(SPDK_RING_TYPE_MP_SC
,
226 spdk_align32pow2(rwb
->num_batches
+ 1),
227 SPDK_ENV_SOCKET_ID_ANY
);
228 if (!rwb
->prio_queue
) {
229 SPDK_ERRLOG("Failed to create high-prio submission queue\n");
233 STAILQ_INIT(&rwb
->free_queue
);
234 STAILQ_INIT(&rwb
->active_queue
);
236 for (i
= 0; i
< rwb
->num_batches
; ++i
) {
237 batch
= &rwb
->batches
[i
];
239 if (ftl_rwb_batch_init(rwb
, batch
, i
)) {
240 SPDK_ERRLOG("Failed to initialize RWB entry buffer\n");
244 STAILQ_INSERT_TAIL(&rwb
->free_queue
, batch
, stailq
);
245 rwb
->num_free_batches
++;
248 for (unsigned int i
= 0; i
< FTL_RWB_TYPE_MAX
; ++i
) {
249 rwb
->limits
[i
] = ftl_rwb_entry_cnt(rwb
);
259 ftl_rwb_free(struct ftl_rwb
*rwb
)
261 struct ftl_rwb_entry
*entry
;
262 struct ftl_rwb_batch
*batch
;
269 for (size_t i
= 0; i
< rwb
->num_batches
; ++i
) {
270 batch
= &rwb
->batches
[i
];
272 if (batch
->entries
) {
273 ftl_rwb_foreach(entry
, batch
) {
274 pthread_spin_destroy(&entry
->lock
);
277 free(batch
->entries
);
280 spdk_dma_free(batch
->buffer
);
281 spdk_dma_free(batch
->md_buffer
);
285 pthread_spin_destroy(&rwb
->lock
);
286 spdk_ring_free(rwb
->submit_queue
);
287 spdk_ring_free(rwb
->prio_queue
);
293 ftl_rwb_batch_release(struct ftl_rwb_batch
*batch
)
295 struct ftl_rwb
*rwb
= batch
->rwb
;
296 struct ftl_rwb_entry
*entry
;
297 unsigned int num_acquired
__attribute__((unused
));
299 batch
->num_ready
= 0;
300 batch
->num_acquired
= 0;
302 ftl_rwb_foreach(entry
, batch
) {
303 num_acquired
= __atomic_fetch_sub(&rwb
->num_acquired
[ftl_rwb_entry_type(entry
)], 1,
305 assert(num_acquired
> 0);
308 pthread_spin_lock(&rwb
->lock
);
309 STAILQ_INSERT_TAIL(&rwb
->free_queue
, batch
, stailq
);
310 rwb
->num_free_batches
++;
311 pthread_spin_unlock(&rwb
->lock
);
315 ftl_rwb_entry_cnt(const struct ftl_rwb
*rwb
)
317 return rwb
->num_batches
* rwb
->xfer_size
;
321 ftl_rwb_num_batches(const struct ftl_rwb
*rwb
)
323 return rwb
->num_batches
;
327 ftl_rwb_size(const struct ftl_rwb
*rwb
)
329 return rwb
->num_batches
* rwb
->xfer_size
;
333 ftl_rwb_batch_get_offset(const struct ftl_rwb_batch
*batch
)
339 ftl_rwb_set_limits(struct ftl_rwb
*rwb
,
340 const size_t limit
[FTL_RWB_TYPE_MAX
])
342 assert(limit
[FTL_RWB_TYPE_USER
] <= ftl_rwb_entry_cnt(rwb
));
343 assert(limit
[FTL_RWB_TYPE_INTERNAL
] <= ftl_rwb_entry_cnt(rwb
));
344 memcpy(rwb
->limits
, limit
, sizeof(rwb
->limits
));
348 ftl_rwb_get_limits(struct ftl_rwb
*rwb
,
349 size_t limit
[FTL_RWB_TYPE_MAX
])
351 memcpy(limit
, rwb
->limits
, sizeof(rwb
->limits
));
355 ftl_rwb_num_acquired(struct ftl_rwb
*rwb
, enum ftl_rwb_entry_type type
)
357 return __atomic_load_n(&rwb
->num_acquired
[type
], __ATOMIC_SEQ_CST
);
361 ftl_rwb_get_active_batches(const struct ftl_rwb
*rwb
)
363 return rwb
->num_active_batches
;
367 ftl_rwb_batch_revert(struct ftl_rwb_batch
*batch
)
369 struct ftl_rwb
*rwb
= batch
->rwb
;
371 if (spdk_ring_enqueue(rwb
->prio_queue
, (void **)&batch
, 1, NULL
) != 1) {
372 assert(0 && "Should never happen");
377 ftl_rwb_push(struct ftl_rwb_entry
*entry
)
379 struct ftl_rwb_batch
*batch
= entry
->batch
;
380 struct ftl_rwb
*rwb
= batch
->rwb
;
383 batch_size
= __atomic_fetch_add(&batch
->num_ready
, 1, __ATOMIC_SEQ_CST
) + 1;
385 /* Once all of the entries are put back, push the batch on the */
386 /* submission queue */
387 if (ftl_rwb_batch_full(batch
, batch_size
)) {
388 if (spdk_ring_enqueue(rwb
->submit_queue
, (void **)&batch
, 1, NULL
) != 1) {
389 assert(0 && "Should never happen");
395 ftl_rwb_check_limits(struct ftl_rwb
*rwb
, enum ftl_rwb_entry_type type
)
397 return ftl_rwb_num_acquired(rwb
, type
) >= rwb
->limits
[type
];
400 static struct ftl_rwb_batch
*
401 _ftl_rwb_acquire_batch(struct ftl_rwb
*rwb
)
403 struct ftl_rwb_batch
*batch
;
406 if (rwb
->num_free_batches
< rwb
->max_active_batches
) {
410 for (i
= 0; i
< rwb
->max_active_batches
; i
++) {
411 batch
= STAILQ_FIRST(&rwb
->free_queue
);
412 STAILQ_REMOVE(&rwb
->free_queue
, batch
, ftl_rwb_batch
, stailq
);
413 rwb
->num_free_batches
--;
415 STAILQ_INSERT_TAIL(&rwb
->active_queue
, batch
, stailq
);
416 rwb
->num_active_batches
++;
419 return STAILQ_FIRST(&rwb
->active_queue
);
422 struct ftl_rwb_entry
*
423 ftl_rwb_acquire(struct ftl_rwb
*rwb
, enum ftl_rwb_entry_type type
)
425 struct ftl_rwb_entry
*entry
= NULL
;
426 struct ftl_rwb_batch
*current
;
428 if (ftl_rwb_check_limits(rwb
, type
)) {
432 pthread_spin_lock(&rwb
->lock
);
434 current
= STAILQ_FIRST(&rwb
->active_queue
);
436 current
= _ftl_rwb_acquire_batch(rwb
);
442 entry
= ¤t
->entries
[current
->num_acquired
++];
444 if (current
->num_acquired
>= rwb
->xfer_size
) {
445 /* If the whole batch is filled, */
446 /* remove the current batch from active_queue */
447 /* since it will need to move to submit_queue */
448 STAILQ_REMOVE(&rwb
->active_queue
, current
, ftl_rwb_batch
, stailq
);
449 rwb
->num_active_batches
--;
450 } else if (current
->num_acquired
% rwb
->interleave_offset
== 0) {
451 /* If the current batch is filled by the interleaving offset, */
452 /* move the current batch at the tail of active_queue */
453 /* to place the next logical blocks into another batch. */
454 STAILQ_REMOVE(&rwb
->active_queue
, current
, ftl_rwb_batch
, stailq
);
455 STAILQ_INSERT_TAIL(&rwb
->active_queue
, current
, stailq
);
458 pthread_spin_unlock(&rwb
->lock
);
459 __atomic_fetch_add(&rwb
->num_acquired
[type
], 1, __ATOMIC_SEQ_CST
);
462 pthread_spin_unlock(&rwb
->lock
);
467 ftl_rwb_disable_interleaving(struct ftl_rwb
*rwb
)
469 struct ftl_rwb_batch
*batch
, *temp
;
471 pthread_spin_lock(&rwb
->lock
);
472 rwb
->max_active_batches
= 1;
473 rwb
->interleave_offset
= rwb
->xfer_size
;
475 STAILQ_FOREACH_SAFE(batch
, &rwb
->active_queue
, stailq
, temp
) {
476 if (batch
->num_acquired
== 0) {
477 STAILQ_REMOVE(&rwb
->active_queue
, batch
, ftl_rwb_batch
, stailq
);
478 rwb
->num_active_batches
--;
480 assert(batch
->num_ready
== 0);
481 assert(batch
->num_acquired
== 0);
483 STAILQ_INSERT_TAIL(&rwb
->free_queue
, batch
, stailq
);
484 rwb
->num_free_batches
++;
487 pthread_spin_unlock(&rwb
->lock
);
490 struct ftl_rwb_batch
*
491 ftl_rwb_pop(struct ftl_rwb
*rwb
)
493 struct ftl_rwb_batch
*batch
= NULL
;
495 if (spdk_ring_dequeue(rwb
->prio_queue
, (void **)&batch
, 1) == 1) {
499 if (spdk_ring_dequeue(rwb
->submit_queue
, (void **)&batch
, 1) == 1) {
506 static struct ftl_rwb_batch
*
507 _ftl_rwb_next_batch(struct ftl_rwb
*rwb
, size_t pos
)
509 if (pos
>= rwb
->num_batches
) {
513 return &rwb
->batches
[pos
];
516 struct ftl_rwb_batch
*
517 ftl_rwb_next_batch(struct ftl_rwb_batch
*batch
)
519 return _ftl_rwb_next_batch(batch
->rwb
, batch
->pos
+ 1);
522 struct ftl_rwb_batch
*
523 ftl_rwb_first_batch(struct ftl_rwb
*rwb
)
525 return _ftl_rwb_next_batch(rwb
, 0);
529 ftl_rwb_batch_empty(struct ftl_rwb_batch
*batch
)
531 return __atomic_load_n(&batch
->num_ready
, __ATOMIC_SEQ_CST
) == 0;
535 ftl_rwb_batch_get_data(struct ftl_rwb_batch
*batch
)
537 return batch
->buffer
;
541 ftl_rwb_batch_get_md(struct ftl_rwb_batch
*batch
)
543 return batch
->md_buffer
;
546 struct ftl_rwb_entry
*
547 ftl_rwb_entry_from_offset(struct ftl_rwb
*rwb
, size_t offset
)
549 unsigned int b_off
, e_off
;
551 b_off
= offset
/ rwb
->xfer_size
;
552 e_off
= offset
% rwb
->xfer_size
;
554 assert(b_off
< rwb
->num_batches
);
556 return &rwb
->batches
[b_off
].entries
[e_off
];
559 struct ftl_rwb_entry
*
560 ftl_rwb_batch_first_entry(struct ftl_rwb_batch
*batch
)
562 return LIST_FIRST(&batch
->entry_list
);