]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/ftl/ftl_rwb.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / ftl / ftl_rwb.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35 #include "spdk/env.h"
36 #include "spdk/util.h"
37
38 #include "ftl_rwb.h"
39 #include "ftl_core.h"
40
41 struct ftl_rwb_batch {
42 /* Parent RWB */
43 struct ftl_rwb *rwb;
44
45 /* Position within RWB */
46 unsigned int pos;
47
48 /* Number of acquired entries */
49 unsigned int num_acquired;
50
51 /* Number of entries ready for submission */
52 unsigned int num_ready;
53
54 /* RWB entry list */
55 LIST_HEAD(, ftl_rwb_entry) entry_list;
56
57 /* Entry buffer */
58 struct ftl_rwb_entry *entries;
59
60 /* Data buffer */
61 void *buffer;
62
63 /* Metadata buffer */
64 void *md_buffer;
65
66 /* Queue entry */
67 STAILQ_ENTRY(ftl_rwb_batch) stailq;
68 };
69
70 struct ftl_rwb {
71 /* Number of batches */
72 size_t num_batches;
73
74 /* Information for interleaving */
75 size_t interleave_offset;
76 /* Maximum number of active batches */
77 size_t max_active_batches;
78
79 /* Number of entries per batch */
80 size_t xfer_size;
81 /* Metadata's size */
82 size_t md_size;
83
84 /* Number of acquired entries */
85 unsigned int num_acquired[FTL_RWB_TYPE_MAX];
86 /* User/internal limits */
87 size_t limits[FTL_RWB_TYPE_MAX];
88
89 /* Active batch queue */
90 STAILQ_HEAD(, ftl_rwb_batch) active_queue;
91 /* Number of active batches */
92 unsigned int num_active_batches;
93
94 /* Free batch queue */
95 STAILQ_HEAD(, ftl_rwb_batch) free_queue;
96 /* Number of active batches */
97 unsigned int num_free_batches;
98
99 /* Submission batch queue */
100 struct spdk_ring *submit_queue;
101 /* High-priority batch queue */
102 struct spdk_ring *prio_queue;
103
104 /* Batch buffer */
105 struct ftl_rwb_batch *batches;
106
107 /* RWB lock */
108 pthread_spinlock_t lock;
109 };
110
111 static int
112 ftl_rwb_batch_full(const struct ftl_rwb_batch *batch, size_t batch_size)
113 {
114 struct ftl_rwb *rwb = batch->rwb;
115 assert(batch_size <= rwb->xfer_size);
116 return batch_size == rwb->xfer_size;
117 }
118
119 static int
120 ftl_rwb_batch_init_entry(struct ftl_rwb_batch *batch, size_t pos)
121 {
122 struct ftl_rwb *rwb = batch->rwb;
123 struct ftl_rwb_entry *entry, *prev;
124 size_t batch_offset = pos % rwb->xfer_size;
125
126 entry = &batch->entries[batch_offset];
127 entry->pos = pos;
128 entry->data = ((char *)batch->buffer) + FTL_BLOCK_SIZE * batch_offset;
129 entry->md = rwb->md_size ? ((char *)batch->md_buffer) + rwb->md_size * batch_offset : NULL;
130 entry->batch = batch;
131 entry->rwb = batch->rwb;
132
133 if (pthread_spin_init(&entry->lock, PTHREAD_PROCESS_PRIVATE)) {
134 SPDK_ERRLOG("Spinlock initialization failure\n");
135 return -1;
136 }
137
138 if (batch_offset > 0) {
139 prev = &batch->entries[batch_offset - 1];
140 LIST_INSERT_AFTER(prev, entry, list_entry);
141 } else {
142 LIST_INSERT_HEAD(&batch->entry_list, entry, list_entry);
143 }
144
145 return 0;
146 }
147
148 static int
149 ftl_rwb_batch_init(struct ftl_rwb *rwb, struct ftl_rwb_batch *batch, unsigned int pos)
150 {
151 size_t md_size, i;
152
153 md_size = rwb->md_size * rwb->xfer_size;
154 batch->rwb = rwb;
155 batch->pos = pos;
156
157 batch->entries = calloc(rwb->xfer_size, sizeof(*batch->entries));
158 if (!batch->entries) {
159 return -1;
160 }
161
162 LIST_INIT(&batch->entry_list);
163
164 batch->buffer = spdk_dma_zmalloc(FTL_BLOCK_SIZE * rwb->xfer_size,
165 FTL_BLOCK_SIZE, NULL);
166 if (!batch->buffer) {
167 return -1;
168 }
169
170 if (md_size > 0) {
171 batch->md_buffer = spdk_dma_zmalloc(md_size, 0, NULL);
172 if (!batch->md_buffer) {
173 return -1;
174 }
175 }
176
177 for (i = 0; i < rwb->xfer_size; ++i) {
178 if (ftl_rwb_batch_init_entry(batch, pos * rwb->xfer_size + i)) {
179 return -1;
180 }
181 }
182
183 return 0;
184 }
185
186 struct ftl_rwb *
187 ftl_rwb_init(const struct spdk_ftl_conf *conf, size_t xfer_size, size_t md_size, size_t num_punits)
188 {
189 struct ftl_rwb *rwb = NULL;
190 struct ftl_rwb_batch *batch;
191 size_t i;
192
193 rwb = calloc(1, sizeof(*rwb));
194 if (!rwb) {
195 SPDK_ERRLOG("Memory allocation failure\n");
196 return NULL;
197 }
198
199 if (pthread_spin_init(&rwb->lock, PTHREAD_PROCESS_PRIVATE)) {
200 SPDK_ERRLOG("Spinlock initialization failure\n");
201 free(rwb);
202 return NULL;
203 }
204
205 assert(conf->rwb_size % xfer_size == 0);
206 rwb->xfer_size = xfer_size;
207 rwb->interleave_offset = xfer_size / conf->num_interleave_units;
208 rwb->max_active_batches = conf->num_interleave_units == 1 ? 1 : num_punits;
209 rwb->md_size = md_size;
210 rwb->num_batches = conf->rwb_size / (FTL_BLOCK_SIZE * xfer_size) + rwb->max_active_batches;
211
212 rwb->batches = calloc(rwb->num_batches, sizeof(*rwb->batches));
213 if (!rwb->batches) {
214 goto error;
215 }
216
217 rwb->submit_queue = spdk_ring_create(SPDK_RING_TYPE_MP_SC,
218 spdk_align32pow2(rwb->num_batches + 1),
219 SPDK_ENV_SOCKET_ID_ANY);
220 if (!rwb->submit_queue) {
221 SPDK_ERRLOG("Failed to create submission queue\n");
222 goto error;
223 }
224
225 rwb->prio_queue = spdk_ring_create(SPDK_RING_TYPE_MP_SC,
226 spdk_align32pow2(rwb->num_batches + 1),
227 SPDK_ENV_SOCKET_ID_ANY);
228 if (!rwb->prio_queue) {
229 SPDK_ERRLOG("Failed to create high-prio submission queue\n");
230 goto error;
231 }
232
233 STAILQ_INIT(&rwb->free_queue);
234 STAILQ_INIT(&rwb->active_queue);
235
236 for (i = 0; i < rwb->num_batches; ++i) {
237 batch = &rwb->batches[i];
238
239 if (ftl_rwb_batch_init(rwb, batch, i)) {
240 SPDK_ERRLOG("Failed to initialize RWB entry buffer\n");
241 goto error;
242 }
243
244 STAILQ_INSERT_TAIL(&rwb->free_queue, batch, stailq);
245 rwb->num_free_batches++;
246 }
247
248 for (unsigned int i = 0; i < FTL_RWB_TYPE_MAX; ++i) {
249 rwb->limits[i] = ftl_rwb_entry_cnt(rwb);
250 }
251
252 return rwb;
253 error:
254 ftl_rwb_free(rwb);
255 return NULL;
256 }
257
258 void
259 ftl_rwb_free(struct ftl_rwb *rwb)
260 {
261 struct ftl_rwb_entry *entry;
262 struct ftl_rwb_batch *batch;
263
264 if (!rwb) {
265 return;
266 }
267
268 if (rwb->batches) {
269 for (size_t i = 0; i < rwb->num_batches; ++i) {
270 batch = &rwb->batches[i];
271
272 if (batch->entries) {
273 ftl_rwb_foreach(entry, batch) {
274 pthread_spin_destroy(&entry->lock);
275 }
276
277 free(batch->entries);
278 }
279
280 spdk_dma_free(batch->buffer);
281 spdk_dma_free(batch->md_buffer);
282 }
283 }
284
285 pthread_spin_destroy(&rwb->lock);
286 spdk_ring_free(rwb->submit_queue);
287 spdk_ring_free(rwb->prio_queue);
288 free(rwb->batches);
289 free(rwb);
290 }
291
292 void
293 ftl_rwb_batch_release(struct ftl_rwb_batch *batch)
294 {
295 struct ftl_rwb *rwb = batch->rwb;
296 struct ftl_rwb_entry *entry;
297 unsigned int num_acquired __attribute__((unused));
298
299 batch->num_ready = 0;
300 batch->num_acquired = 0;
301
302 ftl_rwb_foreach(entry, batch) {
303 num_acquired = __atomic_fetch_sub(&rwb->num_acquired[ftl_rwb_entry_type(entry)], 1,
304 __ATOMIC_SEQ_CST);
305 assert(num_acquired > 0);
306 }
307
308 pthread_spin_lock(&rwb->lock);
309 STAILQ_INSERT_TAIL(&rwb->free_queue, batch, stailq);
310 rwb->num_free_batches++;
311 pthread_spin_unlock(&rwb->lock);
312 }
313
314 size_t
315 ftl_rwb_entry_cnt(const struct ftl_rwb *rwb)
316 {
317 return rwb->num_batches * rwb->xfer_size;
318 }
319
320 size_t
321 ftl_rwb_num_batches(const struct ftl_rwb *rwb)
322 {
323 return rwb->num_batches;
324 }
325
326 size_t
327 ftl_rwb_size(const struct ftl_rwb *rwb)
328 {
329 return rwb->num_batches * rwb->xfer_size;
330 }
331
332 size_t
333 ftl_rwb_batch_get_offset(const struct ftl_rwb_batch *batch)
334 {
335 return batch->pos;
336 }
337
338 void
339 ftl_rwb_set_limits(struct ftl_rwb *rwb,
340 const size_t limit[FTL_RWB_TYPE_MAX])
341 {
342 assert(limit[FTL_RWB_TYPE_USER] <= ftl_rwb_entry_cnt(rwb));
343 assert(limit[FTL_RWB_TYPE_INTERNAL] <= ftl_rwb_entry_cnt(rwb));
344 memcpy(rwb->limits, limit, sizeof(rwb->limits));
345 }
346
347 void
348 ftl_rwb_get_limits(struct ftl_rwb *rwb,
349 size_t limit[FTL_RWB_TYPE_MAX])
350 {
351 memcpy(limit, rwb->limits, sizeof(rwb->limits));
352 }
353
354 size_t
355 ftl_rwb_num_acquired(struct ftl_rwb *rwb, enum ftl_rwb_entry_type type)
356 {
357 return __atomic_load_n(&rwb->num_acquired[type], __ATOMIC_SEQ_CST);
358 }
359
360 size_t
361 ftl_rwb_get_active_batches(const struct ftl_rwb *rwb)
362 {
363 return rwb->num_active_batches;
364 }
365
366 void
367 ftl_rwb_batch_revert(struct ftl_rwb_batch *batch)
368 {
369 struct ftl_rwb *rwb = batch->rwb;
370
371 if (spdk_ring_enqueue(rwb->prio_queue, (void **)&batch, 1, NULL) != 1) {
372 assert(0 && "Should never happen");
373 }
374 }
375
376 void
377 ftl_rwb_push(struct ftl_rwb_entry *entry)
378 {
379 struct ftl_rwb_batch *batch = entry->batch;
380 struct ftl_rwb *rwb = batch->rwb;
381 size_t batch_size;
382
383 batch_size = __atomic_fetch_add(&batch->num_ready, 1, __ATOMIC_SEQ_CST) + 1;
384
385 /* Once all of the entries are put back, push the batch on the */
386 /* submission queue */
387 if (ftl_rwb_batch_full(batch, batch_size)) {
388 if (spdk_ring_enqueue(rwb->submit_queue, (void **)&batch, 1, NULL) != 1) {
389 assert(0 && "Should never happen");
390 }
391 }
392 }
393
394 static int
395 ftl_rwb_check_limits(struct ftl_rwb *rwb, enum ftl_rwb_entry_type type)
396 {
397 return ftl_rwb_num_acquired(rwb, type) >= rwb->limits[type];
398 }
399
400 static struct ftl_rwb_batch *
401 _ftl_rwb_acquire_batch(struct ftl_rwb *rwb)
402 {
403 struct ftl_rwb_batch *batch;
404 size_t i;
405
406 if (rwb->num_free_batches < rwb->max_active_batches) {
407 return NULL;
408 }
409
410 for (i = 0; i < rwb->max_active_batches; i++) {
411 batch = STAILQ_FIRST(&rwb->free_queue);
412 STAILQ_REMOVE(&rwb->free_queue, batch, ftl_rwb_batch, stailq);
413 rwb->num_free_batches--;
414
415 STAILQ_INSERT_TAIL(&rwb->active_queue, batch, stailq);
416 rwb->num_active_batches++;
417 }
418
419 return STAILQ_FIRST(&rwb->active_queue);
420 }
421
422 struct ftl_rwb_entry *
423 ftl_rwb_acquire(struct ftl_rwb *rwb, enum ftl_rwb_entry_type type)
424 {
425 struct ftl_rwb_entry *entry = NULL;
426 struct ftl_rwb_batch *current;
427
428 if (ftl_rwb_check_limits(rwb, type)) {
429 return NULL;
430 }
431
432 pthread_spin_lock(&rwb->lock);
433
434 current = STAILQ_FIRST(&rwb->active_queue);
435 if (!current) {
436 current = _ftl_rwb_acquire_batch(rwb);
437 if (!current) {
438 goto error;
439 }
440 }
441
442 entry = &current->entries[current->num_acquired++];
443
444 if (current->num_acquired >= rwb->xfer_size) {
445 /* If the whole batch is filled, */
446 /* remove the current batch from active_queue */
447 /* since it will need to move to submit_queue */
448 STAILQ_REMOVE(&rwb->active_queue, current, ftl_rwb_batch, stailq);
449 rwb->num_active_batches--;
450 } else if (current->num_acquired % rwb->interleave_offset == 0) {
451 /* If the current batch is filled by the interleaving offset, */
452 /* move the current batch at the tail of active_queue */
453 /* to place the next logical blocks into another batch. */
454 STAILQ_REMOVE(&rwb->active_queue, current, ftl_rwb_batch, stailq);
455 STAILQ_INSERT_TAIL(&rwb->active_queue, current, stailq);
456 }
457
458 pthread_spin_unlock(&rwb->lock);
459 __atomic_fetch_add(&rwb->num_acquired[type], 1, __ATOMIC_SEQ_CST);
460 return entry;
461 error:
462 pthread_spin_unlock(&rwb->lock);
463 return NULL;
464 }
465
466 void
467 ftl_rwb_disable_interleaving(struct ftl_rwb *rwb)
468 {
469 struct ftl_rwb_batch *batch, *temp;
470
471 pthread_spin_lock(&rwb->lock);
472 rwb->max_active_batches = 1;
473 rwb->interleave_offset = rwb->xfer_size;
474
475 STAILQ_FOREACH_SAFE(batch, &rwb->active_queue, stailq, temp) {
476 if (batch->num_acquired == 0) {
477 STAILQ_REMOVE(&rwb->active_queue, batch, ftl_rwb_batch, stailq);
478 rwb->num_active_batches--;
479
480 assert(batch->num_ready == 0);
481 assert(batch->num_acquired == 0);
482
483 STAILQ_INSERT_TAIL(&rwb->free_queue, batch, stailq);
484 rwb->num_free_batches++;
485 }
486 }
487 pthread_spin_unlock(&rwb->lock);
488 }
489
490 struct ftl_rwb_batch *
491 ftl_rwb_pop(struct ftl_rwb *rwb)
492 {
493 struct ftl_rwb_batch *batch = NULL;
494
495 if (spdk_ring_dequeue(rwb->prio_queue, (void **)&batch, 1) == 1) {
496 return batch;
497 }
498
499 if (spdk_ring_dequeue(rwb->submit_queue, (void **)&batch, 1) == 1) {
500 return batch;
501 }
502
503 return NULL;
504 }
505
506 static struct ftl_rwb_batch *
507 _ftl_rwb_next_batch(struct ftl_rwb *rwb, size_t pos)
508 {
509 if (pos >= rwb->num_batches) {
510 return NULL;
511 }
512
513 return &rwb->batches[pos];
514 }
515
516 struct ftl_rwb_batch *
517 ftl_rwb_next_batch(struct ftl_rwb_batch *batch)
518 {
519 return _ftl_rwb_next_batch(batch->rwb, batch->pos + 1);
520 }
521
522 struct ftl_rwb_batch *
523 ftl_rwb_first_batch(struct ftl_rwb *rwb)
524 {
525 return _ftl_rwb_next_batch(rwb, 0);
526 }
527
528 int
529 ftl_rwb_batch_empty(struct ftl_rwb_batch *batch)
530 {
531 return __atomic_load_n(&batch->num_ready, __ATOMIC_SEQ_CST) == 0;
532 }
533
534 void *
535 ftl_rwb_batch_get_data(struct ftl_rwb_batch *batch)
536 {
537 return batch->buffer;
538 }
539
540 void *
541 ftl_rwb_batch_get_md(struct ftl_rwb_batch *batch)
542 {
543 return batch->md_buffer;
544 }
545
546 struct ftl_rwb_entry *
547 ftl_rwb_entry_from_offset(struct ftl_rwb *rwb, size_t offset)
548 {
549 unsigned int b_off, e_off;
550
551 b_off = offset / rwb->xfer_size;
552 e_off = offset % rwb->xfer_size;
553
554 assert(b_off < rwb->num_batches);
555
556 return &rwb->batches[b_off].entries[e_off];
557 }
558
559 struct ftl_rwb_entry *
560 ftl_rwb_batch_first_entry(struct ftl_rwb_batch *batch)
561 {
562 return LIST_FIRST(&batch->entry_list);
563 }