]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/bdev/bdev.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / lib / bdev / bdev.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38
39 #include "spdk/config.h"
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 #include "spdk/trace.h"
49
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
52 #include "spdk/string.h"
53
54 #ifdef SPDK_CONFIG_VTUNE
55 #include "ittnotify.h"
56 #include "ittnotify_types.h"
57 int __itt_init_ittlib(const char *, __itt_group_id);
58 #endif
59
60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024)
61 #define SPDK_BDEV_IO_CACHE_SIZE 256
62 #define BUF_SMALL_POOL_SIZE 8192
63 #define BUF_LARGE_POOL_SIZE 1024
64 #define NOMEM_THRESHOLD_COUNT 8
65 #define ZERO_BUFFER_SIZE 0x100000
66
67 #define OWNER_BDEV 0x2
68
69 #define OBJECT_BDEV_IO 0x2
70
71 #define TRACE_GROUP_BDEV 0x3
72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0)
73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1)
74
75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000
76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1
77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512
78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000
79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024)
80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX
81
82 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"};
83 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"};
84
85 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
86
87 struct spdk_bdev_mgr {
88 struct spdk_mempool *bdev_io_pool;
89
90 struct spdk_mempool *buf_small_pool;
91 struct spdk_mempool *buf_large_pool;
92
93 void *zero_buffer;
94
95 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules;
96
97 struct spdk_bdev_list bdevs;
98
99 bool init_complete;
100 bool module_init_complete;
101
102 #ifdef SPDK_CONFIG_VTUNE
103 __itt_domain *domain;
104 #endif
105 };
106
107 static struct spdk_bdev_mgr g_bdev_mgr = {
108 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
109 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
110 .init_complete = false,
111 .module_init_complete = false,
112 };
113
114 static struct spdk_bdev_opts g_bdev_opts = {
115 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
116 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
117 };
118
119 static spdk_bdev_init_cb g_init_cb_fn = NULL;
120 static void *g_init_cb_arg = NULL;
121
122 static spdk_bdev_fini_cb g_fini_cb_fn = NULL;
123 static void *g_fini_cb_arg = NULL;
124 static struct spdk_thread *g_fini_thread = NULL;
125
126 struct spdk_bdev_qos_limit {
127 /** IOs or bytes allowed per second (i.e., 1s). */
128 uint64_t limit;
129
130 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms).
131 * For remaining bytes, allowed to run negative if an I/O is submitted when
132 * some bytes are remaining, but the I/O is bigger than that amount. The
133 * excess will be deducted from the next timeslice.
134 */
135 int64_t remaining_this_timeslice;
136
137 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
138 uint32_t min_per_timeslice;
139
140 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
141 uint32_t max_per_timeslice;
142 };
143
144 struct spdk_bdev_qos {
145 /** Types of structure of rate limits. */
146 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
147
148 /** The channel that all I/O are funneled through. */
149 struct spdk_bdev_channel *ch;
150
151 /** The thread on which the poller is running. */
152 struct spdk_thread *thread;
153
154 /** Queue of I/O waiting to be issued. */
155 bdev_io_tailq_t queued;
156
157 /** Size of a timeslice in tsc ticks. */
158 uint64_t timeslice_size;
159
160 /** Timestamp of start of last timeslice. */
161 uint64_t last_timeslice;
162
163 /** Poller that processes queued I/O commands each time slice. */
164 struct spdk_poller *poller;
165 };
166
167 struct spdk_bdev_mgmt_channel {
168 bdev_io_stailq_t need_buf_small;
169 bdev_io_stailq_t need_buf_large;
170
171 /*
172 * Each thread keeps a cache of bdev_io - this allows
173 * bdev threads which are *not* DPDK threads to still
174 * benefit from a per-thread bdev_io cache. Without
175 * this, non-DPDK threads fetching from the mempool
176 * incur a cmpxchg on get and put.
177 */
178 bdev_io_stailq_t per_thread_cache;
179 uint32_t per_thread_cache_count;
180 uint32_t bdev_io_cache_size;
181
182 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources;
183 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue;
184 };
185
186 /*
187 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
188 * will queue here their IO that awaits retry. It makes it possible to retry sending
189 * IO to one bdev after IO from other bdev completes.
190 */
191 struct spdk_bdev_shared_resource {
192 /* The bdev management channel */
193 struct spdk_bdev_mgmt_channel *mgmt_ch;
194
195 /*
196 * Count of I/O submitted to bdev module and waiting for completion.
197 * Incremented before submit_request() is called on an spdk_bdev_io.
198 */
199 uint64_t io_outstanding;
200
201 /*
202 * Queue of IO awaiting retry because of a previous NOMEM status returned
203 * on this channel.
204 */
205 bdev_io_tailq_t nomem_io;
206
207 /*
208 * Threshold which io_outstanding must drop to before retrying nomem_io.
209 */
210 uint64_t nomem_threshold;
211
212 /* I/O channel allocated by a bdev module */
213 struct spdk_io_channel *shared_ch;
214
215 /* Refcount of bdev channels using this resource */
216 uint32_t ref;
217
218 TAILQ_ENTRY(spdk_bdev_shared_resource) link;
219 };
220
221 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0)
222 #define BDEV_CH_QOS_ENABLED (1 << 1)
223
224 struct spdk_bdev_channel {
225 struct spdk_bdev *bdev;
226
227 /* The channel for the underlying device */
228 struct spdk_io_channel *channel;
229
230 /* Per io_device per thread data */
231 struct spdk_bdev_shared_resource *shared_resource;
232
233 struct spdk_bdev_io_stat stat;
234
235 /*
236 * Count of I/O submitted through this channel and waiting for completion.
237 * Incremented before submit_request() is called on an spdk_bdev_io.
238 */
239 uint64_t io_outstanding;
240
241 bdev_io_tailq_t queued_resets;
242
243 uint32_t flags;
244
245 #ifdef SPDK_CONFIG_VTUNE
246 uint64_t start_tsc;
247 uint64_t interval_tsc;
248 __itt_string_handle *handle;
249 struct spdk_bdev_io_stat prev_stat;
250 #endif
251
252 };
253
254 struct spdk_bdev_desc {
255 struct spdk_bdev *bdev;
256 struct spdk_thread *thread;
257 spdk_bdev_remove_cb_t remove_cb;
258 void *remove_ctx;
259 bool remove_scheduled;
260 bool closed;
261 bool write;
262 TAILQ_ENTRY(spdk_bdev_desc) link;
263 };
264
265 struct spdk_bdev_iostat_ctx {
266 struct spdk_bdev_io_stat *stat;
267 spdk_bdev_get_device_stat_cb cb;
268 void *cb_arg;
269 };
270
271 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1)
272 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1))
273
274 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
275 void *cb_arg);
276 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
277
278 void
279 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
280 {
281 *opts = g_bdev_opts;
282 }
283
284 int
285 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
286 {
287 uint32_t min_pool_size;
288
289 /*
290 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
291 * initialization. A second mgmt_ch will be created on the same thread when the application starts
292 * but before the deferred put_io_channel event is executed for the first mgmt_ch.
293 */
294 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
295 if (opts->bdev_io_pool_size < min_pool_size) {
296 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
297 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
298 spdk_thread_get_count());
299 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
300 return -1;
301 }
302
303 g_bdev_opts = *opts;
304 return 0;
305 }
306
307 struct spdk_bdev *
308 spdk_bdev_first(void)
309 {
310 struct spdk_bdev *bdev;
311
312 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
313 if (bdev) {
314 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
315 }
316
317 return bdev;
318 }
319
320 struct spdk_bdev *
321 spdk_bdev_next(struct spdk_bdev *prev)
322 {
323 struct spdk_bdev *bdev;
324
325 bdev = TAILQ_NEXT(prev, internal.link);
326 if (bdev) {
327 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
328 }
329
330 return bdev;
331 }
332
333 static struct spdk_bdev *
334 _bdev_next_leaf(struct spdk_bdev *bdev)
335 {
336 while (bdev != NULL) {
337 if (bdev->internal.claim_module == NULL) {
338 return bdev;
339 } else {
340 bdev = TAILQ_NEXT(bdev, internal.link);
341 }
342 }
343
344 return bdev;
345 }
346
347 struct spdk_bdev *
348 spdk_bdev_first_leaf(void)
349 {
350 struct spdk_bdev *bdev;
351
352 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
353
354 if (bdev) {
355 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
356 }
357
358 return bdev;
359 }
360
361 struct spdk_bdev *
362 spdk_bdev_next_leaf(struct spdk_bdev *prev)
363 {
364 struct spdk_bdev *bdev;
365
366 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
367
368 if (bdev) {
369 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
370 }
371
372 return bdev;
373 }
374
375 struct spdk_bdev *
376 spdk_bdev_get_by_name(const char *bdev_name)
377 {
378 struct spdk_bdev_alias *tmp;
379 struct spdk_bdev *bdev = spdk_bdev_first();
380
381 while (bdev != NULL) {
382 if (strcmp(bdev_name, bdev->name) == 0) {
383 return bdev;
384 }
385
386 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
387 if (strcmp(bdev_name, tmp->alias) == 0) {
388 return bdev;
389 }
390 }
391
392 bdev = spdk_bdev_next(bdev);
393 }
394
395 return NULL;
396 }
397
398 void
399 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
400 {
401 struct iovec *iovs;
402
403 iovs = bdev_io->u.bdev.iovs;
404
405 assert(iovs != NULL);
406 assert(bdev_io->u.bdev.iovcnt >= 1);
407
408 iovs[0].iov_base = buf;
409 iovs[0].iov_len = len;
410 }
411
412 static void
413 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
414 {
415 struct spdk_mempool *pool;
416 struct spdk_bdev_io *tmp;
417 void *buf, *aligned_buf;
418 bdev_io_stailq_t *stailq;
419 struct spdk_bdev_mgmt_channel *ch;
420
421 assert(bdev_io->u.bdev.iovcnt == 1);
422
423 buf = bdev_io->internal.buf;
424 ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
425
426 bdev_io->internal.buf = NULL;
427
428 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
429 pool = g_bdev_mgr.buf_small_pool;
430 stailq = &ch->need_buf_small;
431 } else {
432 pool = g_bdev_mgr.buf_large_pool;
433 stailq = &ch->need_buf_large;
434 }
435
436 if (STAILQ_EMPTY(stailq)) {
437 spdk_mempool_put(pool, buf);
438 } else {
439 tmp = STAILQ_FIRST(stailq);
440
441 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
442 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len);
443
444 STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
445 tmp->internal.buf = buf;
446 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
447 }
448 }
449
450 void
451 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
452 {
453 struct spdk_mempool *pool;
454 bdev_io_stailq_t *stailq;
455 void *buf, *aligned_buf;
456 struct spdk_bdev_mgmt_channel *mgmt_ch;
457
458 assert(cb != NULL);
459 assert(bdev_io->u.bdev.iovs != NULL);
460
461 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
462 /* Buffer already present */
463 cb(bdev_io->internal.ch->channel, bdev_io);
464 return;
465 }
466
467 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
468 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
469
470 bdev_io->internal.buf_len = len;
471 bdev_io->internal.get_buf_cb = cb;
472 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
473 pool = g_bdev_mgr.buf_small_pool;
474 stailq = &mgmt_ch->need_buf_small;
475 } else {
476 pool = g_bdev_mgr.buf_large_pool;
477 stailq = &mgmt_ch->need_buf_large;
478 }
479
480 buf = spdk_mempool_get(pool);
481
482 if (!buf) {
483 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
484 } else {
485 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
486 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
487
488 bdev_io->internal.buf = buf;
489 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
490 }
491 }
492
493 static int
494 spdk_bdev_module_get_max_ctx_size(void)
495 {
496 struct spdk_bdev_module *bdev_module;
497 int max_bdev_module_size = 0;
498
499 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
500 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
501 max_bdev_module_size = bdev_module->get_ctx_size();
502 }
503 }
504
505 return max_bdev_module_size;
506 }
507
508 void
509 spdk_bdev_config_text(FILE *fp)
510 {
511 struct spdk_bdev_module *bdev_module;
512
513 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
514 if (bdev_module->config_text) {
515 bdev_module->config_text(fp);
516 }
517 }
518 }
519
520 static void
521 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
522 {
523 int i;
524 struct spdk_bdev_qos *qos = bdev->internal.qos;
525 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
526
527 if (!qos) {
528 return;
529 }
530
531 spdk_bdev_get_qos_rate_limits(bdev, limits);
532
533 spdk_json_write_object_begin(w);
534 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit");
535 spdk_json_write_name(w, "params");
536
537 spdk_json_write_object_begin(w);
538 spdk_json_write_named_string(w, "name", bdev->name);
539 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
540 if (limits[i] > 0) {
541 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]);
542 }
543 }
544 spdk_json_write_object_end(w);
545
546 spdk_json_write_object_end(w);
547 }
548
549 void
550 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
551 {
552 struct spdk_bdev_module *bdev_module;
553 struct spdk_bdev *bdev;
554
555 assert(w != NULL);
556
557 spdk_json_write_array_begin(w);
558
559 spdk_json_write_object_begin(w);
560 spdk_json_write_named_string(w, "method", "set_bdev_options");
561 spdk_json_write_name(w, "params");
562 spdk_json_write_object_begin(w);
563 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
564 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
565 spdk_json_write_object_end(w);
566 spdk_json_write_object_end(w);
567
568 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
569 if (bdev_module->config_json) {
570 bdev_module->config_json(w);
571 }
572 }
573
574 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
575 spdk_bdev_qos_config_json(bdev, w);
576
577 if (bdev->fn_table->write_config_json) {
578 bdev->fn_table->write_config_json(bdev, w);
579 }
580 }
581
582 spdk_json_write_array_end(w);
583 }
584
585 static int
586 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
587 {
588 struct spdk_bdev_mgmt_channel *ch = ctx_buf;
589 struct spdk_bdev_io *bdev_io;
590 uint32_t i;
591
592 STAILQ_INIT(&ch->need_buf_small);
593 STAILQ_INIT(&ch->need_buf_large);
594
595 STAILQ_INIT(&ch->per_thread_cache);
596 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
597
598 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
599 ch->per_thread_cache_count = 0;
600 for (i = 0; i < ch->bdev_io_cache_size; i++) {
601 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
602 assert(bdev_io != NULL);
603 ch->per_thread_cache_count++;
604 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
605 }
606
607 TAILQ_INIT(&ch->shared_resources);
608 TAILQ_INIT(&ch->io_wait_queue);
609
610 return 0;
611 }
612
613 static void
614 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
615 {
616 struct spdk_bdev_mgmt_channel *ch = ctx_buf;
617 struct spdk_bdev_io *bdev_io;
618
619 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
620 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
621 }
622
623 if (!TAILQ_EMPTY(&ch->shared_resources)) {
624 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
625 }
626
627 while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
628 bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
629 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
630 ch->per_thread_cache_count--;
631 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
632 }
633
634 assert(ch->per_thread_cache_count == 0);
635 }
636
637 static void
638 spdk_bdev_init_complete(int rc)
639 {
640 spdk_bdev_init_cb cb_fn = g_init_cb_fn;
641 void *cb_arg = g_init_cb_arg;
642 struct spdk_bdev_module *m;
643
644 g_bdev_mgr.init_complete = true;
645 g_init_cb_fn = NULL;
646 g_init_cb_arg = NULL;
647
648 /*
649 * For modules that need to know when subsystem init is complete,
650 * inform them now.
651 */
652 if (rc == 0) {
653 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
654 if (m->init_complete) {
655 m->init_complete();
656 }
657 }
658 }
659
660 cb_fn(cb_arg, rc);
661 }
662
663 static void
664 spdk_bdev_module_action_complete(void)
665 {
666 struct spdk_bdev_module *m;
667
668 /*
669 * Don't finish bdev subsystem initialization if
670 * module pre-initialization is still in progress, or
671 * the subsystem been already initialized.
672 */
673 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
674 return;
675 }
676
677 /*
678 * Check all bdev modules for inits/examinations in progress. If any
679 * exist, return immediately since we cannot finish bdev subsystem
680 * initialization until all are completed.
681 */
682 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
683 if (m->internal.action_in_progress > 0) {
684 return;
685 }
686 }
687
688 /*
689 * Modules already finished initialization - now that all
690 * the bdev modules have finished their asynchronous I/O
691 * processing, the entire bdev layer can be marked as complete.
692 */
693 spdk_bdev_init_complete(0);
694 }
695
696 static void
697 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
698 {
699 assert(module->internal.action_in_progress > 0);
700 module->internal.action_in_progress--;
701 spdk_bdev_module_action_complete();
702 }
703
704 void
705 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
706 {
707 spdk_bdev_module_action_done(module);
708 }
709
710 void
711 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
712 {
713 spdk_bdev_module_action_done(module);
714 }
715
716 /** The last initialized bdev module */
717 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
718
719 static int
720 spdk_bdev_modules_init(void)
721 {
722 struct spdk_bdev_module *module;
723 int rc = 0;
724
725 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
726 g_resume_bdev_module = module;
727 rc = module->module_init();
728 if (rc != 0) {
729 return rc;
730 }
731 }
732
733 g_resume_bdev_module = NULL;
734 return 0;
735 }
736
737
738 static void
739 spdk_bdev_init_failed_complete(void *cb_arg)
740 {
741 spdk_bdev_init_complete(-1);
742 }
743
744 static void
745 spdk_bdev_init_failed(void *cb_arg)
746 {
747 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
748 }
749
750 void
751 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
752 {
753 struct spdk_conf_section *sp;
754 struct spdk_bdev_opts bdev_opts;
755 int32_t bdev_io_pool_size, bdev_io_cache_size;
756 int cache_size;
757 int rc = 0;
758 char mempool_name[32];
759
760 assert(cb_fn != NULL);
761
762 sp = spdk_conf_find_section(NULL, "Bdev");
763 if (sp != NULL) {
764 spdk_bdev_get_opts(&bdev_opts);
765
766 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
767 if (bdev_io_pool_size >= 0) {
768 bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
769 }
770
771 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
772 if (bdev_io_cache_size >= 0) {
773 bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
774 }
775
776 if (spdk_bdev_set_opts(&bdev_opts)) {
777 spdk_bdev_init_complete(-1);
778 return;
779 }
780
781 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
782 }
783
784 g_init_cb_fn = cb_fn;
785 g_init_cb_arg = cb_arg;
786
787 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
788
789 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
790 g_bdev_opts.bdev_io_pool_size,
791 sizeof(struct spdk_bdev_io) +
792 spdk_bdev_module_get_max_ctx_size(),
793 0,
794 SPDK_ENV_SOCKET_ID_ANY);
795
796 if (g_bdev_mgr.bdev_io_pool == NULL) {
797 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
798 spdk_bdev_init_complete(-1);
799 return;
800 }
801
802 /**
803 * Ensure no more than half of the total buffers end up local caches, by
804 * using spdk_thread_get_count() to determine how many local caches we need
805 * to account for.
806 */
807 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
808 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
809
810 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
811 BUF_SMALL_POOL_SIZE,
812 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
813 cache_size,
814 SPDK_ENV_SOCKET_ID_ANY);
815 if (!g_bdev_mgr.buf_small_pool) {
816 SPDK_ERRLOG("create rbuf small pool failed\n");
817 spdk_bdev_init_complete(-1);
818 return;
819 }
820
821 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
822 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
823
824 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
825 BUF_LARGE_POOL_SIZE,
826 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
827 cache_size,
828 SPDK_ENV_SOCKET_ID_ANY);
829 if (!g_bdev_mgr.buf_large_pool) {
830 SPDK_ERRLOG("create rbuf large pool failed\n");
831 spdk_bdev_init_complete(-1);
832 return;
833 }
834
835 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
836 NULL);
837 if (!g_bdev_mgr.zero_buffer) {
838 SPDK_ERRLOG("create bdev zero buffer failed\n");
839 spdk_bdev_init_complete(-1);
840 return;
841 }
842
843 #ifdef SPDK_CONFIG_VTUNE
844 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
845 #endif
846
847 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
848 spdk_bdev_mgmt_channel_destroy,
849 sizeof(struct spdk_bdev_mgmt_channel),
850 "bdev_mgr");
851
852 rc = spdk_bdev_modules_init();
853 g_bdev_mgr.module_init_complete = true;
854 if (rc != 0) {
855 SPDK_ERRLOG("bdev modules init failed\n");
856 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
857 return;
858 }
859
860 spdk_bdev_module_action_complete();
861 }
862
863 static void
864 spdk_bdev_mgr_unregister_cb(void *io_device)
865 {
866 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
867
868 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
869 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
870 spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
871 g_bdev_opts.bdev_io_pool_size);
872 }
873
874 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
875 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
876 spdk_mempool_count(g_bdev_mgr.buf_small_pool),
877 BUF_SMALL_POOL_SIZE);
878 assert(false);
879 }
880
881 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
882 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
883 spdk_mempool_count(g_bdev_mgr.buf_large_pool),
884 BUF_LARGE_POOL_SIZE);
885 assert(false);
886 }
887
888 spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
889 spdk_mempool_free(g_bdev_mgr.buf_small_pool);
890 spdk_mempool_free(g_bdev_mgr.buf_large_pool);
891 spdk_dma_free(g_bdev_mgr.zero_buffer);
892
893 cb_fn(g_fini_cb_arg);
894 g_fini_cb_fn = NULL;
895 g_fini_cb_arg = NULL;
896 g_bdev_mgr.init_complete = false;
897 g_bdev_mgr.module_init_complete = false;
898 }
899
900 static void
901 spdk_bdev_module_finish_iter(void *arg)
902 {
903 struct spdk_bdev_module *bdev_module;
904
905 /* Start iterating from the last touched module */
906 if (!g_resume_bdev_module) {
907 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list);
908 } else {
909 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list,
910 internal.tailq);
911 }
912
913 while (bdev_module) {
914 if (bdev_module->async_fini) {
915 /* Save our place so we can resume later. We must
916 * save the variable here, before calling module_fini()
917 * below, because in some cases the module may immediately
918 * call spdk_bdev_module_finish_done() and re-enter
919 * this function to continue iterating. */
920 g_resume_bdev_module = bdev_module;
921 }
922
923 if (bdev_module->module_fini) {
924 bdev_module->module_fini();
925 }
926
927 if (bdev_module->async_fini) {
928 return;
929 }
930
931 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list,
932 internal.tailq);
933 }
934
935 g_resume_bdev_module = NULL;
936 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
937 }
938
939 void
940 spdk_bdev_module_finish_done(void)
941 {
942 if (spdk_get_thread() != g_fini_thread) {
943 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
944 } else {
945 spdk_bdev_module_finish_iter(NULL);
946 }
947 }
948
949 static void
950 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
951 {
952 struct spdk_bdev *bdev = cb_arg;
953
954 if (bdeverrno && bdev) {
955 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
956 bdev->name);
957
958 /*
959 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
960 * bdev; try to continue by manually removing this bdev from the list and continue
961 * with the next bdev in the list.
962 */
963 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
964 }
965
966 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
967 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
968 /*
969 * Bdev module finish need to be deffered as we might be in the middle of some context
970 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
971 * after returning.
972 */
973 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
974 return;
975 }
976
977 /*
978 * Unregister the last bdev in the list. The last bdev in the list should be a bdev
979 * that has no bdevs that depend on it.
980 */
981 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
982 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
983 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
984 }
985
986 void
987 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
988 {
989 struct spdk_bdev_module *m;
990
991 assert(cb_fn != NULL);
992
993 g_fini_thread = spdk_get_thread();
994
995 g_fini_cb_fn = cb_fn;
996 g_fini_cb_arg = cb_arg;
997
998 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
999 if (m->fini_start) {
1000 m->fini_start();
1001 }
1002 }
1003
1004 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
1005 }
1006
1007 static struct spdk_bdev_io *
1008 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
1009 {
1010 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
1011 struct spdk_bdev_io *bdev_io;
1012
1013 if (ch->per_thread_cache_count > 0) {
1014 bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
1015 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
1016 ch->per_thread_cache_count--;
1017 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
1018 /*
1019 * Don't try to look for bdev_ios in the global pool if there are
1020 * waiters on bdev_ios - we don't want this caller to jump the line.
1021 */
1022 bdev_io = NULL;
1023 } else {
1024 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
1025 }
1026
1027 return bdev_io;
1028 }
1029
1030 void
1031 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1032 {
1033 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
1034
1035 assert(bdev_io != NULL);
1036 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
1037
1038 if (bdev_io->internal.buf != NULL) {
1039 spdk_bdev_io_put_buf(bdev_io);
1040 }
1041
1042 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
1043 ch->per_thread_cache_count++;
1044 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
1045 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1046 struct spdk_bdev_io_wait_entry *entry;
1047
1048 entry = TAILQ_FIRST(&ch->io_wait_queue);
1049 TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1050 entry->cb_fn(entry->cb_arg);
1051 }
1052 } else {
1053 /* We should never have a full cache with entries on the io wait queue. */
1054 assert(TAILQ_EMPTY(&ch->io_wait_queue));
1055 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1056 }
1057 }
1058
1059 static bool
1060 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit)
1061 {
1062 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1063
1064 switch (limit) {
1065 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1066 return true;
1067 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1068 return false;
1069 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1070 default:
1071 return false;
1072 }
1073 }
1074
1075 static bool
1076 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io)
1077 {
1078 switch (bdev_io->type) {
1079 case SPDK_BDEV_IO_TYPE_NVME_IO:
1080 case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1081 case SPDK_BDEV_IO_TYPE_READ:
1082 case SPDK_BDEV_IO_TYPE_WRITE:
1083 case SPDK_BDEV_IO_TYPE_UNMAP:
1084 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1085 return true;
1086 default:
1087 return false;
1088 }
1089 }
1090
1091 static uint64_t
1092 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1093 {
1094 struct spdk_bdev *bdev = bdev_io->bdev;
1095
1096 switch (bdev_io->type) {
1097 case SPDK_BDEV_IO_TYPE_NVME_IO:
1098 case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1099 return bdev_io->u.nvme_passthru.nbytes;
1100 case SPDK_BDEV_IO_TYPE_READ:
1101 case SPDK_BDEV_IO_TYPE_WRITE:
1102 case SPDK_BDEV_IO_TYPE_UNMAP:
1103 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1104 return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1105 default:
1106 return 0;
1107 }
1108 }
1109
1110 static void
1111 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte)
1112 {
1113 int i;
1114
1115 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1116 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1117 continue;
1118 }
1119
1120 switch (i) {
1121 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1122 qos->rate_limits[i].remaining_this_timeslice--;
1123 break;
1124 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1125 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte;
1126 break;
1127 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1128 default:
1129 break;
1130 }
1131 }
1132 }
1133
1134 static void
1135 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos)
1136 {
1137 struct spdk_bdev_io *bdev_io = NULL;
1138 struct spdk_bdev *bdev = ch->bdev;
1139 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1140 int i;
1141 bool to_limit_io;
1142 uint64_t io_size_in_byte;
1143
1144 while (!TAILQ_EMPTY(&qos->queued)) {
1145 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1146 if (qos->rate_limits[i].max_per_timeslice > 0 &&
1147 (qos->rate_limits[i].remaining_this_timeslice <= 0)) {
1148 return;
1149 }
1150 }
1151
1152 bdev_io = TAILQ_FIRST(&qos->queued);
1153 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1154 ch->io_outstanding++;
1155 shared_resource->io_outstanding++;
1156 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io);
1157 if (to_limit_io == true) {
1158 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io);
1159 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte);
1160 }
1161 bdev->fn_table->submit_request(ch->channel, bdev_io);
1162 }
1163 }
1164
1165 static void
1166 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn)
1167 {
1168 int rc;
1169
1170 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1171 bdev_io->internal.waitq_entry.cb_fn = cb_fn;
1172 bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1173 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1174 &bdev_io->internal.waitq_entry);
1175 if (rc != 0) {
1176 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc);
1177 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1178 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1179 }
1180 }
1181
1182 static bool
1183 _spdk_bdev_io_type_can_split(uint8_t type)
1184 {
1185 assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1186 assert(type < SPDK_BDEV_NUM_IO_TYPES);
1187
1188 /* Only split READ and WRITE I/O. Theoretically other types of I/O like
1189 * UNMAP could be split, but these types of I/O are typically much larger
1190 * in size (sometimes the size of the entire block device), and the bdev
1191 * module can more efficiently split these types of I/O. Plus those types
1192 * of I/O do not have a payload, which makes the splitting process simpler.
1193 */
1194 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1195 return true;
1196 } else {
1197 return false;
1198 }
1199 }
1200
1201 static bool
1202 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1203 {
1204 uint64_t start_stripe, end_stripe;
1205 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1206
1207 if (io_boundary == 0) {
1208 return false;
1209 }
1210
1211 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1212 return false;
1213 }
1214
1215 start_stripe = bdev_io->u.bdev.offset_blocks;
1216 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1217 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */
1218 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1219 start_stripe >>= spdk_u32log2(io_boundary);
1220 end_stripe >>= spdk_u32log2(io_boundary);
1221 } else {
1222 start_stripe /= io_boundary;
1223 end_stripe /= io_boundary;
1224 }
1225 return (start_stripe != end_stripe);
1226 }
1227
1228 static uint32_t
1229 _to_next_boundary(uint64_t offset, uint32_t boundary)
1230 {
1231 return (boundary - (offset % boundary));
1232 }
1233
1234 static void
1235 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1236
1237 static void
1238 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1239 {
1240 struct spdk_bdev_io *bdev_io = _bdev_io;
1241 uint64_t current_offset, remaining;
1242 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1243 struct iovec *parent_iov, *iov;
1244 uint64_t parent_iov_offset, iov_len;
1245 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt;
1246 int rc;
1247
1248 remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1249 current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1250 blocklen = bdev_io->bdev->blocklen;
1251 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1252 parent_iovcnt = bdev_io->u.bdev.iovcnt;
1253
1254 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) {
1255 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
1256 if (parent_iov_offset < parent_iov->iov_len) {
1257 break;
1258 }
1259 parent_iov_offset -= parent_iov->iov_len;
1260 }
1261
1262 child_iovcnt = 0;
1263 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1264 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1265 to_next_boundary = spdk_min(remaining, to_next_boundary);
1266 to_next_boundary_bytes = to_next_boundary * blocklen;
1267 iov = &bdev_io->child_iov[child_iovcnt];
1268 iovcnt = 0;
1269 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt &&
1270 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1271 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
1272 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1273 to_next_boundary_bytes -= iov_len;
1274
1275 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1276 bdev_io->child_iov[child_iovcnt].iov_len = iov_len;
1277
1278 if (iov_len < parent_iov->iov_len - parent_iov_offset) {
1279 parent_iov_offset += iov_len;
1280 } else {
1281 parent_iovpos++;
1282 parent_iov_offset = 0;
1283 }
1284 child_iovcnt++;
1285 iovcnt++;
1286 }
1287
1288 if (to_next_boundary_bytes > 0) {
1289 /* We had to stop this child I/O early because we ran out of
1290 * child_iov space. Make sure the iovs collected are valid and
1291 * then adjust to_next_boundary before starting the child I/O.
1292 */
1293 if ((to_next_boundary_bytes % blocklen) != 0) {
1294 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n",
1295 to_next_boundary_bytes, blocklen);
1296 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1297 if (bdev_io->u.bdev.split_outstanding == 0) {
1298 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1299 }
1300 return;
1301 }
1302 to_next_boundary -= to_next_boundary_bytes / blocklen;
1303 }
1304
1305 bdev_io->u.bdev.split_outstanding++;
1306
1307 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1308 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1309 spdk_io_channel_from_ctx(bdev_io->internal.ch),
1310 iov, iovcnt, current_offset, to_next_boundary,
1311 _spdk_bdev_io_split_done, bdev_io);
1312 } else {
1313 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1314 spdk_io_channel_from_ctx(bdev_io->internal.ch),
1315 iov, iovcnt, current_offset, to_next_boundary,
1316 _spdk_bdev_io_split_done, bdev_io);
1317 }
1318
1319 if (rc == 0) {
1320 current_offset += to_next_boundary;
1321 remaining -= to_next_boundary;
1322 bdev_io->u.bdev.split_current_offset_blocks = current_offset;
1323 bdev_io->u.bdev.split_remaining_num_blocks = remaining;
1324 } else {
1325 bdev_io->u.bdev.split_outstanding--;
1326 if (rc == -ENOMEM) {
1327 if (bdev_io->u.bdev.split_outstanding == 0) {
1328 /* No I/O is outstanding. Hence we should wait here. */
1329 _spdk_bdev_queue_io_wait_with_cb(bdev_io,
1330 _spdk_bdev_io_split_with_payload);
1331 }
1332 } else {
1333 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1334 if (bdev_io->u.bdev.split_outstanding == 0) {
1335 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1336 }
1337 }
1338
1339 return;
1340 }
1341 }
1342 }
1343
1344 static void
1345 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1346 {
1347 struct spdk_bdev_io *parent_io = cb_arg;
1348
1349 spdk_bdev_free_io(bdev_io);
1350
1351 if (!success) {
1352 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1353 }
1354 parent_io->u.bdev.split_outstanding--;
1355 if (parent_io->u.bdev.split_outstanding != 0) {
1356 return;
1357 }
1358
1359 /*
1360 * Parent I/O finishes when all blocks are consumed or there is any failure of
1361 * child I/O and no outstanding child I/O.
1362 */
1363 if (parent_io->u.bdev.split_remaining_num_blocks == 0 ||
1364 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) {
1365 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
1366 parent_io->internal.caller_ctx);
1367 return;
1368 }
1369
1370 /*
1371 * Continue with the splitting process. This function will complete the parent I/O if the
1372 * splitting is done.
1373 */
1374 _spdk_bdev_io_split_with_payload(parent_io);
1375 }
1376
1377 static void
1378 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1379 {
1380 assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1381
1382 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1383 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1384 bdev_io->u.bdev.split_outstanding = 0;
1385 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1386
1387 _spdk_bdev_io_split_with_payload(bdev_io);
1388 }
1389
1390 static void
1391 _spdk_bdev_io_submit(void *ctx)
1392 {
1393 struct spdk_bdev_io *bdev_io = ctx;
1394 struct spdk_bdev *bdev = bdev_io->bdev;
1395 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1396 struct spdk_io_channel *ch = bdev_ch->channel;
1397 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1398 uint64_t tsc;
1399
1400 tsc = spdk_get_ticks();
1401 bdev_io->internal.submit_tsc = tsc;
1402 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
1403 bdev_ch->io_outstanding++;
1404 shared_resource->io_outstanding++;
1405 bdev_io->internal.in_submit_request = true;
1406 if (spdk_likely(bdev_ch->flags == 0)) {
1407 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1408 bdev->fn_table->submit_request(ch, bdev_io);
1409 } else {
1410 bdev_ch->io_outstanding--;
1411 shared_resource->io_outstanding--;
1412 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1413 }
1414 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1415 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1416 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1417 bdev_ch->io_outstanding--;
1418 shared_resource->io_outstanding--;
1419 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1420 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos);
1421 } else {
1422 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1423 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1424 }
1425 bdev_io->internal.in_submit_request = false;
1426 }
1427
1428 static void
1429 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1430 {
1431 struct spdk_bdev *bdev = bdev_io->bdev;
1432 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1433
1434 assert(thread != NULL);
1435 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1436
1437 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1438 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1439 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split,
1440 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1441 } else {
1442 _spdk_bdev_io_split(NULL, bdev_io);
1443 }
1444 return;
1445 }
1446
1447 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1448 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1449 _spdk_bdev_io_submit(bdev_io);
1450 } else {
1451 bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1452 bdev_io->internal.ch = bdev->internal.qos->ch;
1453 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1454 }
1455 } else {
1456 _spdk_bdev_io_submit(bdev_io);
1457 }
1458 }
1459
1460 static void
1461 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1462 {
1463 struct spdk_bdev *bdev = bdev_io->bdev;
1464 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1465 struct spdk_io_channel *ch = bdev_ch->channel;
1466
1467 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1468
1469 bdev_io->internal.in_submit_request = true;
1470 bdev->fn_table->submit_request(ch, bdev_io);
1471 bdev_io->internal.in_submit_request = false;
1472 }
1473
1474 static void
1475 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1476 struct spdk_bdev *bdev, void *cb_arg,
1477 spdk_bdev_io_completion_cb cb)
1478 {
1479 bdev_io->bdev = bdev;
1480 bdev_io->internal.caller_ctx = cb_arg;
1481 bdev_io->internal.cb = cb;
1482 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1483 bdev_io->internal.in_submit_request = false;
1484 bdev_io->internal.buf = NULL;
1485 bdev_io->internal.io_submit_ch = NULL;
1486 }
1487
1488 static bool
1489 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1490 {
1491 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1492 }
1493
1494 bool
1495 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1496 {
1497 bool supported;
1498
1499 supported = _spdk_bdev_io_type_supported(bdev, io_type);
1500
1501 if (!supported) {
1502 switch (io_type) {
1503 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1504 /* The bdev layer will emulate write zeroes as long as write is supported. */
1505 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1506 break;
1507 default:
1508 break;
1509 }
1510 }
1511
1512 return supported;
1513 }
1514
1515 int
1516 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1517 {
1518 if (bdev->fn_table->dump_info_json) {
1519 return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1520 }
1521
1522 return 0;
1523 }
1524
1525 static void
1526 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1527 {
1528 uint32_t max_per_timeslice = 0;
1529 int i;
1530
1531 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1532 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1533 qos->rate_limits[i].max_per_timeslice = 0;
1534 continue;
1535 }
1536
1537 max_per_timeslice = qos->rate_limits[i].limit *
1538 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC;
1539
1540 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice,
1541 qos->rate_limits[i].min_per_timeslice);
1542
1543 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice;
1544 }
1545 }
1546
1547 static int
1548 spdk_bdev_channel_poll_qos(void *arg)
1549 {
1550 struct spdk_bdev_qos *qos = arg;
1551 uint64_t now = spdk_get_ticks();
1552 int i;
1553
1554 if (now < (qos->last_timeslice + qos->timeslice_size)) {
1555 /* We received our callback earlier than expected - return
1556 * immediately and wait to do accounting until at least one
1557 * timeslice has actually expired. This should never happen
1558 * with a well-behaved timer implementation.
1559 */
1560 return 0;
1561 }
1562
1563 /* Reset for next round of rate limiting */
1564 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1565 /* We may have allowed the IOs or bytes to slightly overrun in the last
1566 * timeslice. remaining_this_timeslice is signed, so if it's negative
1567 * here, we'll account for the overrun so that the next timeslice will
1568 * be appropriately reduced.
1569 */
1570 if (qos->rate_limits[i].remaining_this_timeslice > 0) {
1571 qos->rate_limits[i].remaining_this_timeslice = 0;
1572 }
1573 }
1574
1575 while (now >= (qos->last_timeslice + qos->timeslice_size)) {
1576 qos->last_timeslice += qos->timeslice_size;
1577 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1578 qos->rate_limits[i].remaining_this_timeslice +=
1579 qos->rate_limits[i].max_per_timeslice;
1580 }
1581 }
1582
1583 _spdk_bdev_qos_io_submit(qos->ch, qos);
1584
1585 return -1;
1586 }
1587
1588 static void
1589 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1590 {
1591 struct spdk_bdev_shared_resource *shared_resource;
1592
1593 if (!ch) {
1594 return;
1595 }
1596
1597 if (ch->channel) {
1598 spdk_put_io_channel(ch->channel);
1599 }
1600
1601 assert(ch->io_outstanding == 0);
1602
1603 shared_resource = ch->shared_resource;
1604 if (shared_resource) {
1605 assert(ch->io_outstanding == 0);
1606 assert(shared_resource->ref > 0);
1607 shared_resource->ref--;
1608 if (shared_resource->ref == 0) {
1609 assert(shared_resource->io_outstanding == 0);
1610 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1611 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1612 free(shared_resource);
1613 }
1614 }
1615 }
1616
1617 /* Caller must hold bdev->internal.mutex. */
1618 static void
1619 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1620 {
1621 struct spdk_bdev_qos *qos = bdev->internal.qos;
1622 int i;
1623
1624 /* Rate limiting on this bdev enabled */
1625 if (qos) {
1626 if (qos->ch == NULL) {
1627 struct spdk_io_channel *io_ch;
1628
1629 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1630 bdev->name, spdk_get_thread());
1631
1632 /* No qos channel has been selected, so set one up */
1633
1634 /* Take another reference to ch */
1635 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1636 qos->ch = ch;
1637
1638 qos->thread = spdk_io_channel_get_thread(io_ch);
1639
1640 TAILQ_INIT(&qos->queued);
1641
1642 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1643 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
1644 qos->rate_limits[i].min_per_timeslice =
1645 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE;
1646 } else {
1647 qos->rate_limits[i].min_per_timeslice =
1648 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE;
1649 }
1650
1651 if (qos->rate_limits[i].limit == 0) {
1652 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
1653 }
1654 }
1655 spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1656 qos->timeslice_size =
1657 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
1658 qos->last_timeslice = spdk_get_ticks();
1659 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1660 qos,
1661 SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1662 }
1663
1664 ch->flags |= BDEV_CH_QOS_ENABLED;
1665 }
1666 }
1667
1668 static int
1669 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1670 {
1671 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
1672 struct spdk_bdev_channel *ch = ctx_buf;
1673 struct spdk_io_channel *mgmt_io_ch;
1674 struct spdk_bdev_mgmt_channel *mgmt_ch;
1675 struct spdk_bdev_shared_resource *shared_resource;
1676
1677 ch->bdev = bdev;
1678 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1679 if (!ch->channel) {
1680 return -1;
1681 }
1682
1683 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1684 if (!mgmt_io_ch) {
1685 return -1;
1686 }
1687
1688 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1689 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1690 if (shared_resource->shared_ch == ch->channel) {
1691 spdk_put_io_channel(mgmt_io_ch);
1692 shared_resource->ref++;
1693 break;
1694 }
1695 }
1696
1697 if (shared_resource == NULL) {
1698 shared_resource = calloc(1, sizeof(*shared_resource));
1699 if (shared_resource == NULL) {
1700 spdk_put_io_channel(mgmt_io_ch);
1701 return -1;
1702 }
1703
1704 shared_resource->mgmt_ch = mgmt_ch;
1705 shared_resource->io_outstanding = 0;
1706 TAILQ_INIT(&shared_resource->nomem_io);
1707 shared_resource->nomem_threshold = 0;
1708 shared_resource->shared_ch = ch->channel;
1709 shared_resource->ref = 1;
1710 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1711 }
1712
1713 memset(&ch->stat, 0, sizeof(ch->stat));
1714 ch->stat.ticks_rate = spdk_get_ticks_hz();
1715 ch->io_outstanding = 0;
1716 TAILQ_INIT(&ch->queued_resets);
1717 ch->flags = 0;
1718 ch->shared_resource = shared_resource;
1719
1720 #ifdef SPDK_CONFIG_VTUNE
1721 {
1722 char *name;
1723 __itt_init_ittlib(NULL, 0);
1724 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1725 if (!name) {
1726 _spdk_bdev_channel_destroy_resource(ch);
1727 return -1;
1728 }
1729 ch->handle = __itt_string_handle_create(name);
1730 free(name);
1731 ch->start_tsc = spdk_get_ticks();
1732 ch->interval_tsc = spdk_get_ticks_hz() / 100;
1733 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1734 }
1735 #endif
1736
1737 pthread_mutex_lock(&bdev->internal.mutex);
1738 _spdk_bdev_enable_qos(bdev, ch);
1739 pthread_mutex_unlock(&bdev->internal.mutex);
1740
1741 return 0;
1742 }
1743
1744 /*
1745 * Abort I/O that are waiting on a data buffer. These types of I/O are
1746 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1747 */
1748 static void
1749 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1750 {
1751 bdev_io_stailq_t tmp;
1752 struct spdk_bdev_io *bdev_io;
1753
1754 STAILQ_INIT(&tmp);
1755
1756 while (!STAILQ_EMPTY(queue)) {
1757 bdev_io = STAILQ_FIRST(queue);
1758 STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1759 if (bdev_io->internal.ch == ch) {
1760 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1761 } else {
1762 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1763 }
1764 }
1765
1766 STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1767 }
1768
1769 /*
1770 * Abort I/O that are queued waiting for submission. These types of I/O are
1771 * linked using the spdk_bdev_io link TAILQ_ENTRY.
1772 */
1773 static void
1774 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1775 {
1776 struct spdk_bdev_io *bdev_io, *tmp;
1777
1778 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1779 if (bdev_io->internal.ch == ch) {
1780 TAILQ_REMOVE(queue, bdev_io, internal.link);
1781 /*
1782 * spdk_bdev_io_complete() assumes that the completed I/O had
1783 * been submitted to the bdev module. Since in this case it
1784 * hadn't, bump io_outstanding to account for the decrement
1785 * that spdk_bdev_io_complete() will do.
1786 */
1787 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1788 ch->io_outstanding++;
1789 ch->shared_resource->io_outstanding++;
1790 }
1791 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1792 }
1793 }
1794 }
1795
1796 static void
1797 spdk_bdev_qos_channel_destroy(void *cb_arg)
1798 {
1799 struct spdk_bdev_qos *qos = cb_arg;
1800
1801 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1802 spdk_poller_unregister(&qos->poller);
1803
1804 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1805
1806 free(qos);
1807 }
1808
1809 static int
1810 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1811 {
1812 int i;
1813
1814 /*
1815 * Cleanly shutting down the QoS poller is tricky, because
1816 * during the asynchronous operation the user could open
1817 * a new descriptor and create a new channel, spawning
1818 * a new QoS poller.
1819 *
1820 * The strategy is to create a new QoS structure here and swap it
1821 * in. The shutdown path then continues to refer to the old one
1822 * until it completes and then releases it.
1823 */
1824 struct spdk_bdev_qos *new_qos, *old_qos;
1825
1826 old_qos = bdev->internal.qos;
1827
1828 new_qos = calloc(1, sizeof(*new_qos));
1829 if (!new_qos) {
1830 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1831 return -ENOMEM;
1832 }
1833
1834 /* Copy the old QoS data into the newly allocated structure */
1835 memcpy(new_qos, old_qos, sizeof(*new_qos));
1836
1837 /* Zero out the key parts of the QoS structure */
1838 new_qos->ch = NULL;
1839 new_qos->thread = NULL;
1840 new_qos->poller = NULL;
1841 TAILQ_INIT(&new_qos->queued);
1842 /*
1843 * The limit member of spdk_bdev_qos_limit structure is not zeroed.
1844 * It will be used later for the new QoS structure.
1845 */
1846 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1847 new_qos->rate_limits[i].remaining_this_timeslice = 0;
1848 new_qos->rate_limits[i].min_per_timeslice = 0;
1849 new_qos->rate_limits[i].max_per_timeslice = 0;
1850 }
1851
1852 bdev->internal.qos = new_qos;
1853
1854 if (old_qos->thread == NULL) {
1855 free(old_qos);
1856 } else {
1857 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1858 old_qos);
1859 }
1860
1861 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1862 * been destroyed yet. The destruction path will end up waiting for the final
1863 * channel to be put before it releases resources. */
1864
1865 return 0;
1866 }
1867
1868 static void
1869 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
1870 {
1871 total->bytes_read += add->bytes_read;
1872 total->num_read_ops += add->num_read_ops;
1873 total->bytes_written += add->bytes_written;
1874 total->num_write_ops += add->num_write_ops;
1875 total->read_latency_ticks += add->read_latency_ticks;
1876 total->write_latency_ticks += add->write_latency_ticks;
1877 }
1878
1879 static void
1880 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1881 {
1882 struct spdk_bdev_channel *ch = ctx_buf;
1883 struct spdk_bdev_mgmt_channel *mgmt_ch;
1884 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1885
1886 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1887 spdk_get_thread());
1888
1889 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
1890 pthread_mutex_lock(&ch->bdev->internal.mutex);
1891 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
1892 pthread_mutex_unlock(&ch->bdev->internal.mutex);
1893
1894 mgmt_ch = shared_resource->mgmt_ch;
1895
1896 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1897 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1898 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1899 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1900
1901 _spdk_bdev_channel_destroy_resource(ch);
1902 }
1903
1904 int
1905 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1906 {
1907 struct spdk_bdev_alias *tmp;
1908
1909 if (alias == NULL) {
1910 SPDK_ERRLOG("Empty alias passed\n");
1911 return -EINVAL;
1912 }
1913
1914 if (spdk_bdev_get_by_name(alias)) {
1915 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1916 return -EEXIST;
1917 }
1918
1919 tmp = calloc(1, sizeof(*tmp));
1920 if (tmp == NULL) {
1921 SPDK_ERRLOG("Unable to allocate alias\n");
1922 return -ENOMEM;
1923 }
1924
1925 tmp->alias = strdup(alias);
1926 if (tmp->alias == NULL) {
1927 free(tmp);
1928 SPDK_ERRLOG("Unable to allocate alias\n");
1929 return -ENOMEM;
1930 }
1931
1932 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1933
1934 return 0;
1935 }
1936
1937 int
1938 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1939 {
1940 struct spdk_bdev_alias *tmp;
1941
1942 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1943 if (strcmp(alias, tmp->alias) == 0) {
1944 TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1945 free(tmp->alias);
1946 free(tmp);
1947 return 0;
1948 }
1949 }
1950
1951 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1952
1953 return -ENOENT;
1954 }
1955
1956 void
1957 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
1958 {
1959 struct spdk_bdev_alias *p, *tmp;
1960
1961 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
1962 TAILQ_REMOVE(&bdev->aliases, p, tailq);
1963 free(p->alias);
1964 free(p);
1965 }
1966 }
1967
1968 struct spdk_io_channel *
1969 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1970 {
1971 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1972 }
1973
1974 const char *
1975 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1976 {
1977 return bdev->name;
1978 }
1979
1980 const char *
1981 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1982 {
1983 return bdev->product_name;
1984 }
1985
1986 const struct spdk_bdev_aliases_list *
1987 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1988 {
1989 return &bdev->aliases;
1990 }
1991
1992 uint32_t
1993 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1994 {
1995 return bdev->blocklen;
1996 }
1997
1998 uint64_t
1999 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
2000 {
2001 return bdev->blockcnt;
2002 }
2003
2004 const char *
2005 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type)
2006 {
2007 return qos_rpc_type[type];
2008 }
2009
2010 void
2011 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
2012 {
2013 int i;
2014
2015 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
2016
2017 pthread_mutex_lock(&bdev->internal.mutex);
2018 if (bdev->internal.qos) {
2019 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
2020 if (bdev->internal.qos->rate_limits[i].limit !=
2021 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
2022 limits[i] = bdev->internal.qos->rate_limits[i].limit;
2023 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) {
2024 /* Change from Byte to Megabyte which is user visible. */
2025 limits[i] = limits[i] / 1024 / 1024;
2026 }
2027 }
2028 }
2029 }
2030 pthread_mutex_unlock(&bdev->internal.mutex);
2031 }
2032
2033 size_t
2034 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
2035 {
2036 /* TODO: push this logic down to the bdev modules */
2037 if (bdev->need_aligned_buffer) {
2038 return bdev->blocklen;
2039 }
2040
2041 return 1;
2042 }
2043
2044 uint32_t
2045 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
2046 {
2047 return bdev->optimal_io_boundary;
2048 }
2049
2050 bool
2051 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
2052 {
2053 return bdev->write_cache;
2054 }
2055
2056 const struct spdk_uuid *
2057 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
2058 {
2059 return &bdev->uuid;
2060 }
2061
2062 uint64_t
2063 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
2064 {
2065 return bdev->internal.measured_queue_depth;
2066 }
2067
2068 uint64_t
2069 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
2070 {
2071 return bdev->internal.period;
2072 }
2073
2074 uint64_t
2075 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
2076 {
2077 return bdev->internal.weighted_io_time;
2078 }
2079
2080 uint64_t
2081 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
2082 {
2083 return bdev->internal.io_time;
2084 }
2085
2086 static void
2087 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
2088 {
2089 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2090
2091 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
2092
2093 if (bdev->internal.measured_queue_depth) {
2094 bdev->internal.io_time += bdev->internal.period;
2095 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
2096 }
2097 }
2098
2099 static void
2100 _calculate_measured_qd(struct spdk_io_channel_iter *i)
2101 {
2102 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2103 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2104 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
2105
2106 bdev->internal.temporary_queue_depth += ch->io_outstanding;
2107 spdk_for_each_channel_continue(i, 0);
2108 }
2109
2110 static int
2111 spdk_bdev_calculate_measured_queue_depth(void *ctx)
2112 {
2113 struct spdk_bdev *bdev = ctx;
2114 bdev->internal.temporary_queue_depth = 0;
2115 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
2116 _calculate_measured_qd_cpl);
2117 return 0;
2118 }
2119
2120 void
2121 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
2122 {
2123 bdev->internal.period = period;
2124
2125 if (bdev->internal.qd_poller != NULL) {
2126 spdk_poller_unregister(&bdev->internal.qd_poller);
2127 bdev->internal.measured_queue_depth = UINT64_MAX;
2128 }
2129
2130 if (period != 0) {
2131 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
2132 period);
2133 }
2134 }
2135
2136 int
2137 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
2138 {
2139 int ret;
2140
2141 pthread_mutex_lock(&bdev->internal.mutex);
2142
2143 /* bdev has open descriptors */
2144 if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
2145 bdev->blockcnt > size) {
2146 ret = -EBUSY;
2147 } else {
2148 bdev->blockcnt = size;
2149 ret = 0;
2150 }
2151
2152 pthread_mutex_unlock(&bdev->internal.mutex);
2153
2154 return ret;
2155 }
2156
2157 /*
2158 * Convert I/O offset and length from bytes to blocks.
2159 *
2160 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
2161 */
2162 static uint64_t
2163 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
2164 uint64_t num_bytes, uint64_t *num_blocks)
2165 {
2166 uint32_t block_size = bdev->blocklen;
2167
2168 *offset_blocks = offset_bytes / block_size;
2169 *num_blocks = num_bytes / block_size;
2170
2171 return (offset_bytes % block_size) | (num_bytes % block_size);
2172 }
2173
2174 static bool
2175 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
2176 {
2177 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
2178 * has been an overflow and hence the offset has been wrapped around */
2179 if (offset_blocks + num_blocks < offset_blocks) {
2180 return false;
2181 }
2182
2183 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
2184 if (offset_blocks + num_blocks > bdev->blockcnt) {
2185 return false;
2186 }
2187
2188 return true;
2189 }
2190
2191 int
2192 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2193 void *buf, uint64_t offset, uint64_t nbytes,
2194 spdk_bdev_io_completion_cb cb, void *cb_arg)
2195 {
2196 uint64_t offset_blocks, num_blocks;
2197
2198 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2199 return -EINVAL;
2200 }
2201
2202 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2203 }
2204
2205 int
2206 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2207 void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2208 spdk_bdev_io_completion_cb cb, void *cb_arg)
2209 {
2210 struct spdk_bdev *bdev = desc->bdev;
2211 struct spdk_bdev_io *bdev_io;
2212 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2213
2214 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2215 return -EINVAL;
2216 }
2217
2218 bdev_io = spdk_bdev_get_io(channel);
2219 if (!bdev_io) {
2220 return -ENOMEM;
2221 }
2222
2223 bdev_io->internal.ch = channel;
2224 bdev_io->internal.desc = desc;
2225 bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2226 bdev_io->u.bdev.iovs = &bdev_io->iov;
2227 bdev_io->u.bdev.iovs[0].iov_base = buf;
2228 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2229 bdev_io->u.bdev.iovcnt = 1;
2230 bdev_io->u.bdev.num_blocks = num_blocks;
2231 bdev_io->u.bdev.offset_blocks = offset_blocks;
2232 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2233
2234 spdk_bdev_io_submit(bdev_io);
2235 return 0;
2236 }
2237
2238 int
2239 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2240 struct iovec *iov, int iovcnt,
2241 uint64_t offset, uint64_t nbytes,
2242 spdk_bdev_io_completion_cb cb, void *cb_arg)
2243 {
2244 uint64_t offset_blocks, num_blocks;
2245
2246 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2247 return -EINVAL;
2248 }
2249
2250 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2251 }
2252
2253 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2254 struct iovec *iov, int iovcnt,
2255 uint64_t offset_blocks, uint64_t num_blocks,
2256 spdk_bdev_io_completion_cb cb, void *cb_arg)
2257 {
2258 struct spdk_bdev *bdev = desc->bdev;
2259 struct spdk_bdev_io *bdev_io;
2260 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2261
2262 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2263 return -EINVAL;
2264 }
2265
2266 bdev_io = spdk_bdev_get_io(channel);
2267 if (!bdev_io) {
2268 return -ENOMEM;
2269 }
2270
2271 bdev_io->internal.ch = channel;
2272 bdev_io->internal.desc = desc;
2273 bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2274 bdev_io->u.bdev.iovs = iov;
2275 bdev_io->u.bdev.iovcnt = iovcnt;
2276 bdev_io->u.bdev.num_blocks = num_blocks;
2277 bdev_io->u.bdev.offset_blocks = offset_blocks;
2278 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2279
2280 spdk_bdev_io_submit(bdev_io);
2281 return 0;
2282 }
2283
2284 int
2285 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2286 void *buf, uint64_t offset, uint64_t nbytes,
2287 spdk_bdev_io_completion_cb cb, void *cb_arg)
2288 {
2289 uint64_t offset_blocks, num_blocks;
2290
2291 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2292 return -EINVAL;
2293 }
2294
2295 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2296 }
2297
2298 int
2299 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2300 void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2301 spdk_bdev_io_completion_cb cb, void *cb_arg)
2302 {
2303 struct spdk_bdev *bdev = desc->bdev;
2304 struct spdk_bdev_io *bdev_io;
2305 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2306
2307 if (!desc->write) {
2308 return -EBADF;
2309 }
2310
2311 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2312 return -EINVAL;
2313 }
2314
2315 bdev_io = spdk_bdev_get_io(channel);
2316 if (!bdev_io) {
2317 return -ENOMEM;
2318 }
2319
2320 bdev_io->internal.ch = channel;
2321 bdev_io->internal.desc = desc;
2322 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2323 bdev_io->u.bdev.iovs = &bdev_io->iov;
2324 bdev_io->u.bdev.iovs[0].iov_base = buf;
2325 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2326 bdev_io->u.bdev.iovcnt = 1;
2327 bdev_io->u.bdev.num_blocks = num_blocks;
2328 bdev_io->u.bdev.offset_blocks = offset_blocks;
2329 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2330
2331 spdk_bdev_io_submit(bdev_io);
2332 return 0;
2333 }
2334
2335 int
2336 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2337 struct iovec *iov, int iovcnt,
2338 uint64_t offset, uint64_t len,
2339 spdk_bdev_io_completion_cb cb, void *cb_arg)
2340 {
2341 uint64_t offset_blocks, num_blocks;
2342
2343 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2344 return -EINVAL;
2345 }
2346
2347 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2348 }
2349
2350 int
2351 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2352 struct iovec *iov, int iovcnt,
2353 uint64_t offset_blocks, uint64_t num_blocks,
2354 spdk_bdev_io_completion_cb cb, void *cb_arg)
2355 {
2356 struct spdk_bdev *bdev = desc->bdev;
2357 struct spdk_bdev_io *bdev_io;
2358 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2359
2360 if (!desc->write) {
2361 return -EBADF;
2362 }
2363
2364 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2365 return -EINVAL;
2366 }
2367
2368 bdev_io = spdk_bdev_get_io(channel);
2369 if (!bdev_io) {
2370 return -ENOMEM;
2371 }
2372
2373 bdev_io->internal.ch = channel;
2374 bdev_io->internal.desc = desc;
2375 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2376 bdev_io->u.bdev.iovs = iov;
2377 bdev_io->u.bdev.iovcnt = iovcnt;
2378 bdev_io->u.bdev.num_blocks = num_blocks;
2379 bdev_io->u.bdev.offset_blocks = offset_blocks;
2380 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2381
2382 spdk_bdev_io_submit(bdev_io);
2383 return 0;
2384 }
2385
2386 int
2387 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2388 uint64_t offset, uint64_t len,
2389 spdk_bdev_io_completion_cb cb, void *cb_arg)
2390 {
2391 uint64_t offset_blocks, num_blocks;
2392
2393 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2394 return -EINVAL;
2395 }
2396
2397 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2398 }
2399
2400 int
2401 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2402 uint64_t offset_blocks, uint64_t num_blocks,
2403 spdk_bdev_io_completion_cb cb, void *cb_arg)
2404 {
2405 struct spdk_bdev *bdev = desc->bdev;
2406 struct spdk_bdev_io *bdev_io;
2407 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2408
2409 if (!desc->write) {
2410 return -EBADF;
2411 }
2412
2413 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2414 return -EINVAL;
2415 }
2416
2417 bdev_io = spdk_bdev_get_io(channel);
2418
2419 if (!bdev_io) {
2420 return -ENOMEM;
2421 }
2422
2423 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2424 bdev_io->internal.ch = channel;
2425 bdev_io->internal.desc = desc;
2426 bdev_io->u.bdev.offset_blocks = offset_blocks;
2427 bdev_io->u.bdev.num_blocks = num_blocks;
2428 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2429
2430 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2431 spdk_bdev_io_submit(bdev_io);
2432 return 0;
2433 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2434 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2435 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2436 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2437 _spdk_bdev_write_zero_buffer_next(bdev_io);
2438 return 0;
2439 } else {
2440 spdk_bdev_free_io(bdev_io);
2441 return -ENOTSUP;
2442 }
2443 }
2444
2445 int
2446 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2447 uint64_t offset, uint64_t nbytes,
2448 spdk_bdev_io_completion_cb cb, void *cb_arg)
2449 {
2450 uint64_t offset_blocks, num_blocks;
2451
2452 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2453 return -EINVAL;
2454 }
2455
2456 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2457 }
2458
2459 int
2460 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2461 uint64_t offset_blocks, uint64_t num_blocks,
2462 spdk_bdev_io_completion_cb cb, void *cb_arg)
2463 {
2464 struct spdk_bdev *bdev = desc->bdev;
2465 struct spdk_bdev_io *bdev_io;
2466 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2467
2468 if (!desc->write) {
2469 return -EBADF;
2470 }
2471
2472 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2473 return -EINVAL;
2474 }
2475
2476 if (num_blocks == 0) {
2477 SPDK_ERRLOG("Can't unmap 0 bytes\n");
2478 return -EINVAL;
2479 }
2480
2481 bdev_io = spdk_bdev_get_io(channel);
2482 if (!bdev_io) {
2483 return -ENOMEM;
2484 }
2485
2486 bdev_io->internal.ch = channel;
2487 bdev_io->internal.desc = desc;
2488 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2489
2490 bdev_io->u.bdev.iovs = &bdev_io->iov;
2491 bdev_io->u.bdev.iovs[0].iov_base = NULL;
2492 bdev_io->u.bdev.iovs[0].iov_len = 0;
2493 bdev_io->u.bdev.iovcnt = 1;
2494
2495 bdev_io->u.bdev.offset_blocks = offset_blocks;
2496 bdev_io->u.bdev.num_blocks = num_blocks;
2497 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2498
2499 spdk_bdev_io_submit(bdev_io);
2500 return 0;
2501 }
2502
2503 int
2504 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2505 uint64_t offset, uint64_t length,
2506 spdk_bdev_io_completion_cb cb, void *cb_arg)
2507 {
2508 uint64_t offset_blocks, num_blocks;
2509
2510 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2511 return -EINVAL;
2512 }
2513
2514 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2515 }
2516
2517 int
2518 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2519 uint64_t offset_blocks, uint64_t num_blocks,
2520 spdk_bdev_io_completion_cb cb, void *cb_arg)
2521 {
2522 struct spdk_bdev *bdev = desc->bdev;
2523 struct spdk_bdev_io *bdev_io;
2524 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2525
2526 if (!desc->write) {
2527 return -EBADF;
2528 }
2529
2530 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2531 return -EINVAL;
2532 }
2533
2534 bdev_io = spdk_bdev_get_io(channel);
2535 if (!bdev_io) {
2536 return -ENOMEM;
2537 }
2538
2539 bdev_io->internal.ch = channel;
2540 bdev_io->internal.desc = desc;
2541 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2542 bdev_io->u.bdev.iovs = NULL;
2543 bdev_io->u.bdev.iovcnt = 0;
2544 bdev_io->u.bdev.offset_blocks = offset_blocks;
2545 bdev_io->u.bdev.num_blocks = num_blocks;
2546 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2547
2548 spdk_bdev_io_submit(bdev_io);
2549 return 0;
2550 }
2551
2552 static void
2553 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2554 {
2555 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2556 struct spdk_bdev_io *bdev_io;
2557
2558 bdev_io = TAILQ_FIRST(&ch->queued_resets);
2559 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2560 spdk_bdev_io_submit_reset(bdev_io);
2561 }
2562
2563 static void
2564 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2565 {
2566 struct spdk_io_channel *ch;
2567 struct spdk_bdev_channel *channel;
2568 struct spdk_bdev_mgmt_channel *mgmt_channel;
2569 struct spdk_bdev_shared_resource *shared_resource;
2570 bdev_io_tailq_t tmp_queued;
2571
2572 TAILQ_INIT(&tmp_queued);
2573
2574 ch = spdk_io_channel_iter_get_channel(i);
2575 channel = spdk_io_channel_get_ctx(ch);
2576 shared_resource = channel->shared_resource;
2577 mgmt_channel = shared_resource->mgmt_ch;
2578
2579 channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2580
2581 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2582 /* The QoS object is always valid and readable while
2583 * the channel flag is set, so the lock here should not
2584 * be necessary. We're not in the fast path though, so
2585 * just take it anyway. */
2586 pthread_mutex_lock(&channel->bdev->internal.mutex);
2587 if (channel->bdev->internal.qos->ch == channel) {
2588 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2589 }
2590 pthread_mutex_unlock(&channel->bdev->internal.mutex);
2591 }
2592
2593 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2594 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2595 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2596 _spdk_bdev_abort_queued_io(&tmp_queued, channel);
2597
2598 spdk_for_each_channel_continue(i, 0);
2599 }
2600
2601 static void
2602 _spdk_bdev_start_reset(void *ctx)
2603 {
2604 struct spdk_bdev_channel *ch = ctx;
2605
2606 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2607 ch, _spdk_bdev_reset_dev);
2608 }
2609
2610 static void
2611 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2612 {
2613 struct spdk_bdev *bdev = ch->bdev;
2614
2615 assert(!TAILQ_EMPTY(&ch->queued_resets));
2616
2617 pthread_mutex_lock(&bdev->internal.mutex);
2618 if (bdev->internal.reset_in_progress == NULL) {
2619 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2620 /*
2621 * Take a channel reference for the target bdev for the life of this
2622 * reset. This guards against the channel getting destroyed while
2623 * spdk_for_each_channel() calls related to this reset IO are in
2624 * progress. We will release the reference when this reset is
2625 * completed.
2626 */
2627 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2628 _spdk_bdev_start_reset(ch);
2629 }
2630 pthread_mutex_unlock(&bdev->internal.mutex);
2631 }
2632
2633 int
2634 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2635 spdk_bdev_io_completion_cb cb, void *cb_arg)
2636 {
2637 struct spdk_bdev *bdev = desc->bdev;
2638 struct spdk_bdev_io *bdev_io;
2639 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2640
2641 bdev_io = spdk_bdev_get_io(channel);
2642 if (!bdev_io) {
2643 return -ENOMEM;
2644 }
2645
2646 bdev_io->internal.ch = channel;
2647 bdev_io->internal.desc = desc;
2648 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2649 bdev_io->u.reset.ch_ref = NULL;
2650 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2651
2652 pthread_mutex_lock(&bdev->internal.mutex);
2653 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2654 pthread_mutex_unlock(&bdev->internal.mutex);
2655
2656 _spdk_bdev_channel_start_reset(channel);
2657
2658 return 0;
2659 }
2660
2661 void
2662 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2663 struct spdk_bdev_io_stat *stat)
2664 {
2665 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2666
2667 *stat = channel->stat;
2668 }
2669
2670 static void
2671 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2672 {
2673 void *io_device = spdk_io_channel_iter_get_io_device(i);
2674 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2675
2676 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2677 bdev_iostat_ctx->cb_arg, 0);
2678 free(bdev_iostat_ctx);
2679 }
2680
2681 static void
2682 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2683 {
2684 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2685 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2686 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2687
2688 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
2689 spdk_for_each_channel_continue(i, 0);
2690 }
2691
2692 void
2693 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2694 spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2695 {
2696 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2697
2698 assert(bdev != NULL);
2699 assert(stat != NULL);
2700 assert(cb != NULL);
2701
2702 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2703 if (bdev_iostat_ctx == NULL) {
2704 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2705 cb(bdev, stat, cb_arg, -ENOMEM);
2706 return;
2707 }
2708
2709 bdev_iostat_ctx->stat = stat;
2710 bdev_iostat_ctx->cb = cb;
2711 bdev_iostat_ctx->cb_arg = cb_arg;
2712
2713 /* Start with the statistics from previously deleted channels. */
2714 pthread_mutex_lock(&bdev->internal.mutex);
2715 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
2716 pthread_mutex_unlock(&bdev->internal.mutex);
2717
2718 /* Then iterate and add the statistics from each existing channel. */
2719 spdk_for_each_channel(__bdev_to_io_dev(bdev),
2720 _spdk_bdev_get_each_channel_stat,
2721 bdev_iostat_ctx,
2722 _spdk_bdev_get_device_stat_done);
2723 }
2724
2725 int
2726 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2727 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2728 spdk_bdev_io_completion_cb cb, void *cb_arg)
2729 {
2730 struct spdk_bdev *bdev = desc->bdev;
2731 struct spdk_bdev_io *bdev_io;
2732 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2733
2734 if (!desc->write) {
2735 return -EBADF;
2736 }
2737
2738 bdev_io = spdk_bdev_get_io(channel);
2739 if (!bdev_io) {
2740 return -ENOMEM;
2741 }
2742
2743 bdev_io->internal.ch = channel;
2744 bdev_io->internal.desc = desc;
2745 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2746 bdev_io->u.nvme_passthru.cmd = *cmd;
2747 bdev_io->u.nvme_passthru.buf = buf;
2748 bdev_io->u.nvme_passthru.nbytes = nbytes;
2749 bdev_io->u.nvme_passthru.md_buf = NULL;
2750 bdev_io->u.nvme_passthru.md_len = 0;
2751
2752 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2753
2754 spdk_bdev_io_submit(bdev_io);
2755 return 0;
2756 }
2757
2758 int
2759 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2760 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2761 spdk_bdev_io_completion_cb cb, void *cb_arg)
2762 {
2763 struct spdk_bdev *bdev = desc->bdev;
2764 struct spdk_bdev_io *bdev_io;
2765 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2766
2767 if (!desc->write) {
2768 /*
2769 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2770 * to easily determine if the command is a read or write, but for now just
2771 * do not allow io_passthru with a read-only descriptor.
2772 */
2773 return -EBADF;
2774 }
2775
2776 bdev_io = spdk_bdev_get_io(channel);
2777 if (!bdev_io) {
2778 return -ENOMEM;
2779 }
2780
2781 bdev_io->internal.ch = channel;
2782 bdev_io->internal.desc = desc;
2783 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2784 bdev_io->u.nvme_passthru.cmd = *cmd;
2785 bdev_io->u.nvme_passthru.buf = buf;
2786 bdev_io->u.nvme_passthru.nbytes = nbytes;
2787 bdev_io->u.nvme_passthru.md_buf = NULL;
2788 bdev_io->u.nvme_passthru.md_len = 0;
2789
2790 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2791
2792 spdk_bdev_io_submit(bdev_io);
2793 return 0;
2794 }
2795
2796 int
2797 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2798 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2799 spdk_bdev_io_completion_cb cb, void *cb_arg)
2800 {
2801 struct spdk_bdev *bdev = desc->bdev;
2802 struct spdk_bdev_io *bdev_io;
2803 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2804
2805 if (!desc->write) {
2806 /*
2807 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2808 * to easily determine if the command is a read or write, but for now just
2809 * do not allow io_passthru with a read-only descriptor.
2810 */
2811 return -EBADF;
2812 }
2813
2814 bdev_io = spdk_bdev_get_io(channel);
2815 if (!bdev_io) {
2816 return -ENOMEM;
2817 }
2818
2819 bdev_io->internal.ch = channel;
2820 bdev_io->internal.desc = desc;
2821 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2822 bdev_io->u.nvme_passthru.cmd = *cmd;
2823 bdev_io->u.nvme_passthru.buf = buf;
2824 bdev_io->u.nvme_passthru.nbytes = nbytes;
2825 bdev_io->u.nvme_passthru.md_buf = md_buf;
2826 bdev_io->u.nvme_passthru.md_len = md_len;
2827
2828 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2829
2830 spdk_bdev_io_submit(bdev_io);
2831 return 0;
2832 }
2833
2834 int
2835 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2836 struct spdk_bdev_io_wait_entry *entry)
2837 {
2838 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2839 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2840
2841 if (bdev != entry->bdev) {
2842 SPDK_ERRLOG("bdevs do not match\n");
2843 return -EINVAL;
2844 }
2845
2846 if (mgmt_ch->per_thread_cache_count > 0) {
2847 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2848 return -EINVAL;
2849 }
2850
2851 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2852 return 0;
2853 }
2854
2855 static void
2856 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2857 {
2858 struct spdk_bdev *bdev = bdev_ch->bdev;
2859 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2860 struct spdk_bdev_io *bdev_io;
2861
2862 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2863 /*
2864 * Allow some more I/O to complete before retrying the nomem_io queue.
2865 * Some drivers (such as nvme) cannot immediately take a new I/O in
2866 * the context of a completion, because the resources for the I/O are
2867 * not released until control returns to the bdev poller. Also, we
2868 * may require several small I/O to complete before a larger I/O
2869 * (that requires splitting) can be submitted.
2870 */
2871 return;
2872 }
2873
2874 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2875 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2876 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2877 bdev_io->internal.ch->io_outstanding++;
2878 shared_resource->io_outstanding++;
2879 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2880 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2881 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2882 break;
2883 }
2884 }
2885 }
2886
2887 static inline void
2888 _spdk_bdev_io_complete(void *ctx)
2889 {
2890 struct spdk_bdev_io *bdev_io = ctx;
2891 uint64_t tsc;
2892
2893 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2894 /*
2895 * Send the completion to the thread that originally submitted the I/O,
2896 * which may not be the current thread in the case of QoS.
2897 */
2898 if (bdev_io->internal.io_submit_ch) {
2899 bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2900 bdev_io->internal.io_submit_ch = NULL;
2901 }
2902
2903 /*
2904 * Defer completion to avoid potential infinite recursion if the
2905 * user's completion callback issues a new I/O.
2906 */
2907 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2908 _spdk_bdev_io_complete, bdev_io);
2909 return;
2910 }
2911
2912 tsc = spdk_get_ticks();
2913 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
2914
2915 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2916 switch (bdev_io->type) {
2917 case SPDK_BDEV_IO_TYPE_READ:
2918 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2919 bdev_io->internal.ch->stat.num_read_ops++;
2920 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2921 break;
2922 case SPDK_BDEV_IO_TYPE_WRITE:
2923 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2924 bdev_io->internal.ch->stat.num_write_ops++;
2925 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2926 break;
2927 default:
2928 break;
2929 }
2930 }
2931
2932 #ifdef SPDK_CONFIG_VTUNE
2933 uint64_t now_tsc = spdk_get_ticks();
2934 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2935 uint64_t data[5];
2936
2937 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2938 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2939 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2940 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2941 data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2942 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2943
2944 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2945 __itt_metadata_u64, 5, data);
2946
2947 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2948 bdev_io->internal.ch->start_tsc = now_tsc;
2949 }
2950 #endif
2951
2952 assert(bdev_io->internal.cb != NULL);
2953 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2954
2955 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2956 bdev_io->internal.caller_ctx);
2957 }
2958
2959 static void
2960 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2961 {
2962 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2963
2964 if (bdev_io->u.reset.ch_ref != NULL) {
2965 spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2966 bdev_io->u.reset.ch_ref = NULL;
2967 }
2968
2969 _spdk_bdev_io_complete(bdev_io);
2970 }
2971
2972 static void
2973 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2974 {
2975 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2976 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2977
2978 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2979 if (!TAILQ_EMPTY(&ch->queued_resets)) {
2980 _spdk_bdev_channel_start_reset(ch);
2981 }
2982
2983 spdk_for_each_channel_continue(i, 0);
2984 }
2985
2986 void
2987 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2988 {
2989 struct spdk_bdev *bdev = bdev_io->bdev;
2990 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2991 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2992
2993 bdev_io->internal.status = status;
2994
2995 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2996 bool unlock_channels = false;
2997
2998 if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2999 SPDK_ERRLOG("NOMEM returned for reset\n");
3000 }
3001 pthread_mutex_lock(&bdev->internal.mutex);
3002 if (bdev_io == bdev->internal.reset_in_progress) {
3003 bdev->internal.reset_in_progress = NULL;
3004 unlock_channels = true;
3005 }
3006 pthread_mutex_unlock(&bdev->internal.mutex);
3007
3008 if (unlock_channels) {
3009 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
3010 bdev_io, _spdk_bdev_reset_complete);
3011 return;
3012 }
3013 } else {
3014 assert(bdev_ch->io_outstanding > 0);
3015 assert(shared_resource->io_outstanding > 0);
3016 bdev_ch->io_outstanding--;
3017 shared_resource->io_outstanding--;
3018
3019 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
3020 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
3021 /*
3022 * Wait for some of the outstanding I/O to complete before we
3023 * retry any of the nomem_io. Normally we will wait for
3024 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
3025 * depth channels we will instead wait for half to complete.
3026 */
3027 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
3028 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
3029 return;
3030 }
3031
3032 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
3033 _spdk_bdev_ch_retry_io(bdev_ch);
3034 }
3035 }
3036
3037 _spdk_bdev_io_complete(bdev_io);
3038 }
3039
3040 void
3041 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
3042 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
3043 {
3044 if (sc == SPDK_SCSI_STATUS_GOOD) {
3045 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3046 } else {
3047 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
3048 bdev_io->internal.error.scsi.sc = sc;
3049 bdev_io->internal.error.scsi.sk = sk;
3050 bdev_io->internal.error.scsi.asc = asc;
3051 bdev_io->internal.error.scsi.ascq = ascq;
3052 }
3053
3054 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3055 }
3056
3057 void
3058 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
3059 int *sc, int *sk, int *asc, int *ascq)
3060 {
3061 assert(sc != NULL);
3062 assert(sk != NULL);
3063 assert(asc != NULL);
3064 assert(ascq != NULL);
3065
3066 switch (bdev_io->internal.status) {
3067 case SPDK_BDEV_IO_STATUS_SUCCESS:
3068 *sc = SPDK_SCSI_STATUS_GOOD;
3069 *sk = SPDK_SCSI_SENSE_NO_SENSE;
3070 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3071 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3072 break;
3073 case SPDK_BDEV_IO_STATUS_NVME_ERROR:
3074 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
3075 break;
3076 case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
3077 *sc = bdev_io->internal.error.scsi.sc;
3078 *sk = bdev_io->internal.error.scsi.sk;
3079 *asc = bdev_io->internal.error.scsi.asc;
3080 *ascq = bdev_io->internal.error.scsi.ascq;
3081 break;
3082 default:
3083 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
3084 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
3085 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3086 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3087 break;
3088 }
3089 }
3090
3091 void
3092 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
3093 {
3094 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
3095 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3096 } else {
3097 bdev_io->internal.error.nvme.sct = sct;
3098 bdev_io->internal.error.nvme.sc = sc;
3099 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
3100 }
3101
3102 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3103 }
3104
3105 void
3106 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
3107 {
3108 assert(sct != NULL);
3109 assert(sc != NULL);
3110
3111 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
3112 *sct = bdev_io->internal.error.nvme.sct;
3113 *sc = bdev_io->internal.error.nvme.sc;
3114 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
3115 *sct = SPDK_NVME_SCT_GENERIC;
3116 *sc = SPDK_NVME_SC_SUCCESS;
3117 } else {
3118 *sct = SPDK_NVME_SCT_GENERIC;
3119 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
3120 }
3121 }
3122
3123 struct spdk_thread *
3124 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
3125 {
3126 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
3127 }
3128
3129 static void
3130 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits)
3131 {
3132 uint64_t min_qos_set;
3133 int i;
3134
3135 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3136 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3137 break;
3138 }
3139 }
3140
3141 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3142 SPDK_ERRLOG("Invalid rate limits set.\n");
3143 return;
3144 }
3145
3146 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3147 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3148 continue;
3149 }
3150
3151 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3152 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3153 } else {
3154 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3155 }
3156
3157 if (limits[i] == 0 || limits[i] % min_qos_set) {
3158 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n",
3159 limits[i], bdev->name, min_qos_set);
3160 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
3161 return;
3162 }
3163 }
3164
3165 if (!bdev->internal.qos) {
3166 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3167 if (!bdev->internal.qos) {
3168 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3169 return;
3170 }
3171 }
3172
3173 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3174 bdev->internal.qos->rate_limits[i].limit = limits[i];
3175 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
3176 bdev->name, i, limits[i]);
3177 }
3178
3179 return;
3180 }
3181
3182 static void
3183 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
3184 {
3185 struct spdk_conf_section *sp = NULL;
3186 const char *val = NULL;
3187 int i = 0, j = 0;
3188 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
3189 bool config_qos = false;
3190
3191 sp = spdk_conf_find_section(NULL, "QoS");
3192 if (!sp) {
3193 return;
3194 }
3195
3196 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3197 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3198
3199 i = 0;
3200 while (true) {
3201 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0);
3202 if (!val) {
3203 break;
3204 }
3205
3206 if (strcmp(bdev->name, val) != 0) {
3207 i++;
3208 continue;
3209 }
3210
3211 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1);
3212 if (val) {
3213 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) {
3214 limits[j] = strtoull(val, NULL, 10);
3215 } else {
3216 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024;
3217 }
3218 config_qos = true;
3219 }
3220
3221 break;
3222 }
3223
3224 j++;
3225 }
3226
3227 if (config_qos == true) {
3228 _spdk_bdev_qos_config_limit(bdev, limits);
3229 }
3230
3231 return;
3232 }
3233
3234 static int
3235 spdk_bdev_init(struct spdk_bdev *bdev)
3236 {
3237 char *bdev_name;
3238
3239 assert(bdev->module != NULL);
3240
3241 if (!bdev->name) {
3242 SPDK_ERRLOG("Bdev name is NULL\n");
3243 return -EINVAL;
3244 }
3245
3246 if (spdk_bdev_get_by_name(bdev->name)) {
3247 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3248 return -EEXIST;
3249 }
3250
3251 /* Users often register their own I/O devices using the bdev name. In
3252 * order to avoid conflicts, prepend bdev_. */
3253 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name);
3254 if (!bdev_name) {
3255 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n");
3256 return -ENOMEM;
3257 }
3258
3259 bdev->internal.status = SPDK_BDEV_STATUS_READY;
3260 bdev->internal.measured_queue_depth = UINT64_MAX;
3261 bdev->internal.claim_module = NULL;
3262 bdev->internal.qd_poller = NULL;
3263 bdev->internal.qos = NULL;
3264
3265 TAILQ_INIT(&bdev->internal.open_descs);
3266
3267 TAILQ_INIT(&bdev->aliases);
3268
3269 bdev->internal.reset_in_progress = NULL;
3270
3271 _spdk_bdev_qos_config(bdev);
3272
3273 spdk_io_device_register(__bdev_to_io_dev(bdev),
3274 spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3275 sizeof(struct spdk_bdev_channel),
3276 bdev_name);
3277
3278 free(bdev_name);
3279
3280 pthread_mutex_init(&bdev->internal.mutex, NULL);
3281 return 0;
3282 }
3283
3284 static void
3285 spdk_bdev_destroy_cb(void *io_device)
3286 {
3287 int rc;
3288 struct spdk_bdev *bdev;
3289 spdk_bdev_unregister_cb cb_fn;
3290 void *cb_arg;
3291
3292 bdev = __bdev_from_io_dev(io_device);
3293 cb_fn = bdev->internal.unregister_cb;
3294 cb_arg = bdev->internal.unregister_ctx;
3295
3296 rc = bdev->fn_table->destruct(bdev->ctxt);
3297 if (rc < 0) {
3298 SPDK_ERRLOG("destruct failed\n");
3299 }
3300 if (rc <= 0 && cb_fn != NULL) {
3301 cb_fn(cb_arg, rc);
3302 }
3303 }
3304
3305
3306 static void
3307 spdk_bdev_fini(struct spdk_bdev *bdev)
3308 {
3309 pthread_mutex_destroy(&bdev->internal.mutex);
3310
3311 free(bdev->internal.qos);
3312
3313 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3314 }
3315
3316 static void
3317 spdk_bdev_start(struct spdk_bdev *bdev)
3318 {
3319 struct spdk_bdev_module *module;
3320 uint32_t action;
3321
3322 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3323 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3324
3325 /* Examine configuration before initializing I/O */
3326 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3327 if (module->examine_config) {
3328 action = module->internal.action_in_progress;
3329 module->internal.action_in_progress++;
3330 module->examine_config(bdev);
3331 if (action != module->internal.action_in_progress) {
3332 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3333 module->name);
3334 }
3335 }
3336 }
3337
3338 if (bdev->internal.claim_module) {
3339 return;
3340 }
3341
3342 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3343 if (module->examine_disk) {
3344 module->internal.action_in_progress++;
3345 module->examine_disk(bdev);
3346 }
3347 }
3348 }
3349
3350 int
3351 spdk_bdev_register(struct spdk_bdev *bdev)
3352 {
3353 int rc = spdk_bdev_init(bdev);
3354
3355 if (rc == 0) {
3356 spdk_bdev_start(bdev);
3357 }
3358
3359 return rc;
3360 }
3361
3362 int
3363 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3364 {
3365 int rc;
3366
3367 rc = spdk_bdev_init(vbdev);
3368 if (rc) {
3369 return rc;
3370 }
3371
3372 spdk_bdev_start(vbdev);
3373 return 0;
3374 }
3375
3376 void
3377 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3378 {
3379 if (bdev->internal.unregister_cb != NULL) {
3380 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3381 }
3382 }
3383
3384 static void
3385 _remove_notify(void *arg)
3386 {
3387 struct spdk_bdev_desc *desc = arg;
3388
3389 desc->remove_scheduled = false;
3390
3391 if (desc->closed) {
3392 free(desc);
3393 } else {
3394 desc->remove_cb(desc->remove_ctx);
3395 }
3396 }
3397
3398 void
3399 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3400 {
3401 struct spdk_bdev_desc *desc, *tmp;
3402 bool do_destruct = true;
3403 struct spdk_thread *thread;
3404
3405 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3406
3407 thread = spdk_get_thread();
3408 if (!thread) {
3409 /* The user called this from a non-SPDK thread. */
3410 if (cb_fn != NULL) {
3411 cb_fn(cb_arg, -ENOTSUP);
3412 }
3413 return;
3414 }
3415
3416 pthread_mutex_lock(&bdev->internal.mutex);
3417
3418 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3419 bdev->internal.unregister_cb = cb_fn;
3420 bdev->internal.unregister_ctx = cb_arg;
3421
3422 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3423 if (desc->remove_cb) {
3424 do_destruct = false;
3425 /*
3426 * Defer invocation of the remove_cb to a separate message that will
3427 * run later on its thread. This ensures this context unwinds and
3428 * we don't recursively unregister this bdev again if the remove_cb
3429 * immediately closes its descriptor.
3430 */
3431 if (!desc->remove_scheduled) {
3432 /* Avoid scheduling removal of the same descriptor multiple times. */
3433 desc->remove_scheduled = true;
3434 spdk_thread_send_msg(desc->thread, _remove_notify, desc);
3435 }
3436 }
3437 }
3438
3439 if (!do_destruct) {
3440 pthread_mutex_unlock(&bdev->internal.mutex);
3441 return;
3442 }
3443
3444 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3445 pthread_mutex_unlock(&bdev->internal.mutex);
3446
3447 spdk_bdev_fini(bdev);
3448 }
3449
3450 int
3451 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3452 void *remove_ctx, struct spdk_bdev_desc **_desc)
3453 {
3454 struct spdk_bdev_desc *desc;
3455 struct spdk_thread *thread;
3456
3457 thread = spdk_get_thread();
3458 if (!thread) {
3459 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n");
3460 return -ENOTSUP;
3461 }
3462
3463 desc = calloc(1, sizeof(*desc));
3464 if (desc == NULL) {
3465 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3466 return -ENOMEM;
3467 }
3468
3469 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3470 spdk_get_thread());
3471
3472 pthread_mutex_lock(&bdev->internal.mutex);
3473
3474 if (write && bdev->internal.claim_module) {
3475 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3476 bdev->name, bdev->internal.claim_module->name);
3477 free(desc);
3478 pthread_mutex_unlock(&bdev->internal.mutex);
3479 return -EPERM;
3480 }
3481
3482 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3483
3484 desc->bdev = bdev;
3485 desc->thread = thread;
3486 desc->remove_cb = remove_cb;
3487 desc->remove_ctx = remove_ctx;
3488 desc->write = write;
3489 *_desc = desc;
3490
3491 pthread_mutex_unlock(&bdev->internal.mutex);
3492
3493 return 0;
3494 }
3495
3496 void
3497 spdk_bdev_close(struct spdk_bdev_desc *desc)
3498 {
3499 struct spdk_bdev *bdev = desc->bdev;
3500 bool do_unregister = false;
3501
3502 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3503 spdk_get_thread());
3504
3505 assert(desc->thread == spdk_get_thread());
3506
3507 pthread_mutex_lock(&bdev->internal.mutex);
3508
3509 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3510
3511 desc->closed = true;
3512
3513 if (!desc->remove_scheduled) {
3514 free(desc);
3515 }
3516
3517 /* If no more descriptors, kill QoS channel */
3518 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3519 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3520 bdev->name, spdk_get_thread());
3521
3522 if (spdk_bdev_qos_destroy(bdev)) {
3523 /* There isn't anything we can do to recover here. Just let the
3524 * old QoS poller keep running. The QoS handling won't change
3525 * cores when the user allocates a new channel, but it won't break. */
3526 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3527 }
3528 }
3529
3530 spdk_bdev_set_qd_sampling_period(bdev, 0);
3531
3532 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3533 do_unregister = true;
3534 }
3535 pthread_mutex_unlock(&bdev->internal.mutex);
3536
3537 if (do_unregister == true) {
3538 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3539 }
3540 }
3541
3542 int
3543 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3544 struct spdk_bdev_module *module)
3545 {
3546 if (bdev->internal.claim_module != NULL) {
3547 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3548 bdev->internal.claim_module->name);
3549 return -EPERM;
3550 }
3551
3552 if (desc && !desc->write) {
3553 desc->write = true;
3554 }
3555
3556 bdev->internal.claim_module = module;
3557 return 0;
3558 }
3559
3560 void
3561 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3562 {
3563 assert(bdev->internal.claim_module != NULL);
3564 bdev->internal.claim_module = NULL;
3565 }
3566
3567 struct spdk_bdev *
3568 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3569 {
3570 return desc->bdev;
3571 }
3572
3573 void
3574 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3575 {
3576 struct iovec *iovs;
3577 int iovcnt;
3578
3579 if (bdev_io == NULL) {
3580 return;
3581 }
3582
3583 switch (bdev_io->type) {
3584 case SPDK_BDEV_IO_TYPE_READ:
3585 iovs = bdev_io->u.bdev.iovs;
3586 iovcnt = bdev_io->u.bdev.iovcnt;
3587 break;
3588 case SPDK_BDEV_IO_TYPE_WRITE:
3589 iovs = bdev_io->u.bdev.iovs;
3590 iovcnt = bdev_io->u.bdev.iovcnt;
3591 break;
3592 default:
3593 iovs = NULL;
3594 iovcnt = 0;
3595 break;
3596 }
3597
3598 if (iovp) {
3599 *iovp = iovs;
3600 }
3601 if (iovcntp) {
3602 *iovcntp = iovcnt;
3603 }
3604 }
3605
3606 void
3607 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3608 {
3609
3610 if (spdk_bdev_module_list_find(bdev_module->name)) {
3611 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3612 assert(false);
3613 }
3614
3615 if (bdev_module->async_init) {
3616 bdev_module->internal.action_in_progress = 1;
3617 }
3618
3619 /*
3620 * Modules with examine callbacks must be initialized first, so they are
3621 * ready to handle examine callbacks from later modules that will
3622 * register physical bdevs.
3623 */
3624 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3625 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3626 } else {
3627 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3628 }
3629 }
3630
3631 struct spdk_bdev_module *
3632 spdk_bdev_module_list_find(const char *name)
3633 {
3634 struct spdk_bdev_module *bdev_module;
3635
3636 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3637 if (strcmp(name, bdev_module->name) == 0) {
3638 break;
3639 }
3640 }
3641
3642 return bdev_module;
3643 }
3644
3645 static void
3646 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
3647 {
3648 struct spdk_bdev_io *bdev_io = _bdev_io;
3649 uint64_t num_bytes, num_blocks;
3650 int rc;
3651
3652 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
3653 bdev_io->u.bdev.split_remaining_num_blocks,
3654 ZERO_BUFFER_SIZE);
3655 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
3656
3657 rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
3658 spdk_io_channel_from_ctx(bdev_io->internal.ch),
3659 g_bdev_mgr.zero_buffer,
3660 bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
3661 _spdk_bdev_write_zero_buffer_done, bdev_io);
3662 if (rc == 0) {
3663 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
3664 bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
3665 } else if (rc == -ENOMEM) {
3666 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next);
3667 } else {
3668 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3669 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
3670 }
3671 }
3672
3673 static void
3674 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3675 {
3676 struct spdk_bdev_io *parent_io = cb_arg;
3677
3678 spdk_bdev_free_io(bdev_io);
3679
3680 if (!success) {
3681 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3682 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
3683 return;
3684 }
3685
3686 if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
3687 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3688 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
3689 return;
3690 }
3691
3692 _spdk_bdev_write_zero_buffer_next(parent_io);
3693 }
3694
3695 struct set_qos_limit_ctx {
3696 void (*cb_fn)(void *cb_arg, int status);
3697 void *cb_arg;
3698 struct spdk_bdev *bdev;
3699 };
3700
3701 static void
3702 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3703 {
3704 pthread_mutex_lock(&ctx->bdev->internal.mutex);
3705 ctx->bdev->internal.qos_mod_in_progress = false;
3706 pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3707
3708 ctx->cb_fn(ctx->cb_arg, status);
3709 free(ctx);
3710 }
3711
3712 static void
3713 _spdk_bdev_disable_qos_done(void *cb_arg)
3714 {
3715 struct set_qos_limit_ctx *ctx = cb_arg;
3716 struct spdk_bdev *bdev = ctx->bdev;
3717 struct spdk_bdev_io *bdev_io;
3718 struct spdk_bdev_qos *qos;
3719
3720 pthread_mutex_lock(&bdev->internal.mutex);
3721 qos = bdev->internal.qos;
3722 bdev->internal.qos = NULL;
3723 pthread_mutex_unlock(&bdev->internal.mutex);
3724
3725 while (!TAILQ_EMPTY(&qos->queued)) {
3726 /* Send queued I/O back to their original thread for resubmission. */
3727 bdev_io = TAILQ_FIRST(&qos->queued);
3728 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3729
3730 if (bdev_io->internal.io_submit_ch) {
3731 /*
3732 * Channel was changed when sending it to the QoS thread - change it back
3733 * before sending it back to the original thread.
3734 */
3735 bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3736 bdev_io->internal.io_submit_ch = NULL;
3737 }
3738
3739 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3740 _spdk_bdev_io_submit, bdev_io);
3741 }
3742
3743 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3744 spdk_poller_unregister(&qos->poller);
3745
3746 free(qos);
3747
3748 _spdk_bdev_set_qos_limit_done(ctx, 0);
3749 }
3750
3751 static void
3752 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3753 {
3754 void *io_device = spdk_io_channel_iter_get_io_device(i);
3755 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3756 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3757 struct spdk_thread *thread;
3758
3759 pthread_mutex_lock(&bdev->internal.mutex);
3760 thread = bdev->internal.qos->thread;
3761 pthread_mutex_unlock(&bdev->internal.mutex);
3762
3763 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3764 }
3765
3766 static void
3767 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3768 {
3769 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3770 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3771
3772 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3773
3774 spdk_for_each_channel_continue(i, 0);
3775 }
3776
3777 static void
3778 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg)
3779 {
3780 struct set_qos_limit_ctx *ctx = cb_arg;
3781 struct spdk_bdev *bdev = ctx->bdev;
3782
3783 pthread_mutex_lock(&bdev->internal.mutex);
3784 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3785 pthread_mutex_unlock(&bdev->internal.mutex);
3786
3787 _spdk_bdev_set_qos_limit_done(ctx, 0);
3788 }
3789
3790 static void
3791 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3792 {
3793 void *io_device = spdk_io_channel_iter_get_io_device(i);
3794 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3795 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3796 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3797
3798 pthread_mutex_lock(&bdev->internal.mutex);
3799 _spdk_bdev_enable_qos(bdev, bdev_ch);
3800 pthread_mutex_unlock(&bdev->internal.mutex);
3801 spdk_for_each_channel_continue(i, 0);
3802 }
3803
3804 static void
3805 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3806 {
3807 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3808
3809 _spdk_bdev_set_qos_limit_done(ctx, status);
3810 }
3811
3812 static void
3813 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
3814 {
3815 int i;
3816
3817 assert(bdev->internal.qos != NULL);
3818
3819 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3820 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3821 bdev->internal.qos->rate_limits[i].limit = limits[i];
3822
3823 if (limits[i] == 0) {
3824 bdev->internal.qos->rate_limits[i].limit =
3825 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3826 }
3827 }
3828 }
3829 }
3830
3831 void
3832 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
3833 void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3834 {
3835 struct set_qos_limit_ctx *ctx;
3836 uint32_t limit_set_complement;
3837 uint64_t min_limit_per_sec;
3838 int i;
3839 bool disable_rate_limit = true;
3840
3841 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3842 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3843 continue;
3844 }
3845
3846 if (limits[i] > 0) {
3847 disable_rate_limit = false;
3848 }
3849
3850 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3851 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3852 } else {
3853 /* Change from megabyte to byte rate limit */
3854 limits[i] = limits[i] * 1024 * 1024;
3855 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3856 }
3857
3858 limit_set_complement = limits[i] % min_limit_per_sec;
3859 if (limit_set_complement) {
3860 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n",
3861 limits[i], min_limit_per_sec);
3862 limits[i] += min_limit_per_sec - limit_set_complement;
3863 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]);
3864 }
3865 }
3866
3867 ctx = calloc(1, sizeof(*ctx));
3868 if (ctx == NULL) {
3869 cb_fn(cb_arg, -ENOMEM);
3870 return;
3871 }
3872
3873 ctx->cb_fn = cb_fn;
3874 ctx->cb_arg = cb_arg;
3875 ctx->bdev = bdev;
3876
3877 pthread_mutex_lock(&bdev->internal.mutex);
3878 if (bdev->internal.qos_mod_in_progress) {
3879 pthread_mutex_unlock(&bdev->internal.mutex);
3880 free(ctx);
3881 cb_fn(cb_arg, -EAGAIN);
3882 return;
3883 }
3884 bdev->internal.qos_mod_in_progress = true;
3885
3886 if (disable_rate_limit == true && bdev->internal.qos) {
3887 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3888 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED &&
3889 (bdev->internal.qos->rate_limits[i].limit > 0 &&
3890 bdev->internal.qos->rate_limits[i].limit !=
3891 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) {
3892 disable_rate_limit = false;
3893 break;
3894 }
3895 }
3896 }
3897
3898 if (disable_rate_limit == false) {
3899 if (bdev->internal.qos == NULL) {
3900 /* Enabling */
3901 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3902 if (!bdev->internal.qos) {
3903 pthread_mutex_unlock(&bdev->internal.mutex);
3904 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3905 free(ctx);
3906 cb_fn(cb_arg, -ENOMEM);
3907 return;
3908 }
3909
3910 _spdk_bdev_set_qos_rate_limits(bdev, limits);
3911
3912 spdk_for_each_channel(__bdev_to_io_dev(bdev),
3913 _spdk_bdev_enable_qos_msg, ctx,
3914 _spdk_bdev_enable_qos_done);
3915 } else {
3916 /* Updating */
3917 _spdk_bdev_set_qos_rate_limits(bdev, limits);
3918
3919 spdk_thread_send_msg(bdev->internal.qos->thread,
3920 _spdk_bdev_update_qos_rate_limit_msg, ctx);
3921 }
3922 } else {
3923 if (bdev->internal.qos != NULL) {
3924 _spdk_bdev_set_qos_rate_limits(bdev, limits);
3925
3926 /* Disabling */
3927 spdk_for_each_channel(__bdev_to_io_dev(bdev),
3928 _spdk_bdev_disable_qos_msg, ctx,
3929 _spdk_bdev_disable_qos_msg_done);
3930 } else {
3931 pthread_mutex_unlock(&bdev->internal.mutex);
3932 _spdk_bdev_set_qos_limit_done(ctx, 0);
3933 return;
3934 }
3935 }
3936
3937 pthread_mutex_unlock(&bdev->internal.mutex);
3938 }
3939
3940 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3941
3942 SPDK_TRACE_REGISTER_FN(bdev_trace)
3943 {
3944 spdk_trace_register_owner(OWNER_BDEV, 'b');
3945 spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
3946 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
3947 OBJECT_BDEV_IO, 1, 0, "type: ");
3948 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
3949 OBJECT_BDEV_IO, 0, 0, "");
3950 }