4 * Copyright (c) Intel Corporation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk/stdinc.h"
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
42 #include "spdk/conf.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
53 #define SPDK_RBD_QUEUE_DEPTH 128
55 static int bdev_rbd_count
= 0;
57 #define BDEV_RBD_POLL_US 50
60 struct spdk_bdev disk
;
65 rbd_image_info_t info
;
66 TAILQ_ENTRY(bdev_rbd
) tailq
;
67 struct spdk_poller
*reset_timer
;
68 struct spdk_bdev_io
*reset_bdev_io
;
71 struct bdev_rbd_io_channel
{
76 struct bdev_rbd
*disk
;
77 struct spdk_poller
*poller
;
81 uint64_t remaining_len
;
87 bdev_rbd_free(struct bdev_rbd
*rbd
)
97 spdk_bdev_rbd_free_config(rbd
->config
);
102 spdk_bdev_rbd_free_config(char **config
)
107 for (entry
= config
; *entry
; entry
++) {
115 spdk_bdev_rbd_dup_config(const char *const *config
)
123 for (count
= 0; config
[count
]; count
++) {}
124 copy
= calloc(count
+ 1, sizeof(*copy
));
128 for (count
= 0; config
[count
]; count
++) {
129 if (!(copy
[count
] = strdup(config
[count
]))) {
130 spdk_bdev_rbd_free_config(copy
);
138 bdev_rados_context_init(const char *user_id
, const char *rbd_pool_name
, const char *const *config
,
139 rados_t
*cluster
, rados_ioctx_t
*io_ctx
)
143 ret
= rados_create(cluster
, user_id
);
145 SPDK_ERRLOG("Failed to create rados_t struct\n");
150 const char *const *entry
= config
;
152 ret
= rados_conf_set(*cluster
, entry
[0], entry
[1]);
154 SPDK_ERRLOG("Failed to set %s = %s\n", entry
[0], entry
[1]);
155 rados_shutdown(*cluster
);
161 ret
= rados_conf_read_file(*cluster
, NULL
);
163 SPDK_ERRLOG("Failed to read conf file\n");
164 rados_shutdown(*cluster
);
169 ret
= rados_connect(*cluster
);
171 SPDK_ERRLOG("Failed to connect to rbd_pool\n");
172 rados_shutdown(*cluster
);
176 ret
= rados_ioctx_create(*cluster
, rbd_pool_name
, io_ctx
);
179 SPDK_ERRLOG("Failed to create ioctx\n");
180 rados_shutdown(*cluster
);
188 bdev_rbd_init(const char *user_id
, const char *rbd_pool_name
, const char *const *config
,
189 const char *rbd_name
, rbd_image_info_t
*info
)
192 rados_t cluster
= NULL
;
193 rados_ioctx_t io_ctx
= NULL
;
194 rbd_image_t image
= NULL
;
196 ret
= bdev_rados_context_init(user_id
, rbd_pool_name
, config
, &cluster
, &io_ctx
);
198 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n",
199 user_id
? user_id
: "admin (the default)", rbd_pool_name
);
203 ret
= rbd_open(io_ctx
, rbd_name
, &image
, NULL
);
205 SPDK_ERRLOG("Failed to open specified rbd device\n");
208 ret
= rbd_stat(image
, info
, sizeof(*info
));
211 SPDK_ERRLOG("Failed to stat specified rbd device\n");
215 rados_ioctx_destroy(io_ctx
);
218 rados_ioctx_destroy(io_ctx
);
219 rados_shutdown(cluster
);
224 bdev_rbd_exit(rbd_image_t image
)
231 bdev_rbd_finish_aiocb(rbd_completion_t cb
, void *arg
)
233 /* Doing nothing here */
237 bdev_rbd_start_aio(rbd_image_t image
, struct spdk_bdev_io
*bdev_io
,
238 void *buf
, uint64_t offset
, size_t len
)
241 rbd_completion_t comp
;
243 ret
= rbd_aio_create_completion(bdev_io
, bdev_rbd_finish_aiocb
,
249 if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_READ
) {
250 ret
= rbd_aio_read(image
, offset
, len
,
252 } else if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_WRITE
) {
253 ret
= rbd_aio_write(image
, offset
, len
,
255 } else if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_FLUSH
) {
256 ret
= rbd_aio_flush(image
, comp
);
260 rbd_aio_release(comp
);
267 static int bdev_rbd_library_init(void);
270 bdev_rbd_get_ctx_size(void)
272 return sizeof(struct bdev_rbd_io
);
275 static struct spdk_bdev_module rbd_if
= {
277 .module_init
= bdev_rbd_library_init
,
278 .get_ctx_size
= bdev_rbd_get_ctx_size
,
281 SPDK_BDEV_MODULE_REGISTER(rbd
, &rbd_if
)
284 bdev_rbd_rw(struct bdev_rbd
*disk
, struct spdk_io_channel
*ch
,
285 struct spdk_bdev_io
*bdev_io
, struct iovec
*iov
,
286 int iovcnt
, size_t len
, uint64_t offset
)
288 struct bdev_rbd_io
*rbd_io
= (struct bdev_rbd_io
*)bdev_io
->driver_ctx
;
289 struct bdev_rbd_io_channel
*rbdio_ch
= spdk_io_channel_get_ctx(ch
);
290 size_t remaining
= len
;
293 rbd_io
->remaining_len
= 0;
294 rbd_io
->num_segments
= 0;
295 rbd_io
->failed
= false;
297 for (i
= 0; i
< iovcnt
&& remaining
> 0; i
++) {
298 size_t seg_len
= spdk_min(remaining
, iov
[i
].iov_len
);
300 rc
= bdev_rbd_start_aio(rbdio_ch
->image
, bdev_io
, iov
[i
].iov_base
, offset
, seg_len
);
303 * This bdev_rbd_start_aio() call failed, but if any previous ones were
304 * submitted, we need to wait for them to finish.
306 if (rbd_io
->num_segments
== 0) {
307 /* No previous I/O submitted - return error code immediately. */
311 /* Return and wait for outstanding I/O to complete. */
312 rbd_io
->failed
= true;
316 rbd_io
->num_segments
++;
317 rbd_io
->remaining_len
+= seg_len
;
320 remaining
-= seg_len
;
327 bdev_rbd_flush(struct bdev_rbd
*disk
, struct spdk_io_channel
*ch
,
328 struct spdk_bdev_io
*bdev_io
, uint64_t offset
, uint64_t nbytes
)
330 struct bdev_rbd_io_channel
*rbdio_ch
= spdk_io_channel_get_ctx(ch
);
332 return bdev_rbd_start_aio(rbdio_ch
->image
, bdev_io
, NULL
, offset
, nbytes
);
336 bdev_rbd_reset_timer(void *arg
)
338 struct bdev_rbd
*disk
= arg
;
341 * TODO: This should check if any I/O is still in flight before completing the reset.
342 * For now, just complete after the timer expires.
344 spdk_bdev_io_complete(disk
->reset_bdev_io
, SPDK_BDEV_IO_STATUS_SUCCESS
);
345 spdk_poller_unregister(&disk
->reset_timer
);
346 disk
->reset_bdev_io
= NULL
;
352 bdev_rbd_reset(struct bdev_rbd
*disk
, struct spdk_bdev_io
*bdev_io
)
355 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
356 * timer to wait for in-flight I/O to complete.
358 assert(disk
->reset_bdev_io
== NULL
);
359 disk
->reset_bdev_io
= bdev_io
;
360 disk
->reset_timer
= spdk_poller_register(bdev_rbd_reset_timer
, disk
, 1 * 1000 * 1000);
366 bdev_rbd_destruct(void *ctx
)
368 struct bdev_rbd
*rbd
= ctx
;
370 spdk_io_device_unregister(rbd
, NULL
);
377 bdev_rbd_get_buf_cb(struct spdk_io_channel
*ch
, struct spdk_bdev_io
*bdev_io
,
383 spdk_bdev_io_complete(bdev_io
, SPDK_BDEV_IO_STATUS_FAILED
);
387 ret
= bdev_rbd_rw(bdev_io
->bdev
->ctxt
,
390 bdev_io
->u
.bdev
.iovs
,
391 bdev_io
->u
.bdev
.iovcnt
,
392 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
,
393 bdev_io
->u
.bdev
.offset_blocks
* bdev_io
->bdev
->blocklen
);
396 spdk_bdev_io_complete(bdev_io
, SPDK_BDEV_IO_STATUS_FAILED
);
400 static int _bdev_rbd_submit_request(struct spdk_io_channel
*ch
, struct spdk_bdev_io
*bdev_io
)
402 switch (bdev_io
->type
) {
403 case SPDK_BDEV_IO_TYPE_READ
:
404 spdk_bdev_io_get_buf(bdev_io
, bdev_rbd_get_buf_cb
,
405 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
);
408 case SPDK_BDEV_IO_TYPE_WRITE
:
409 return bdev_rbd_rw((struct bdev_rbd
*)bdev_io
->bdev
->ctxt
,
412 bdev_io
->u
.bdev
.iovs
,
413 bdev_io
->u
.bdev
.iovcnt
,
414 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
,
415 bdev_io
->u
.bdev
.offset_blocks
* bdev_io
->bdev
->blocklen
);
417 case SPDK_BDEV_IO_TYPE_FLUSH
:
418 return bdev_rbd_flush((struct bdev_rbd
*)bdev_io
->bdev
->ctxt
,
421 bdev_io
->u
.bdev
.offset_blocks
* bdev_io
->bdev
->blocklen
,
422 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
);
424 case SPDK_BDEV_IO_TYPE_RESET
:
425 return bdev_rbd_reset((struct bdev_rbd
*)bdev_io
->bdev
->ctxt
,
434 static void bdev_rbd_submit_request(struct spdk_io_channel
*ch
, struct spdk_bdev_io
*bdev_io
)
436 if (_bdev_rbd_submit_request(ch
, bdev_io
) < 0) {
437 spdk_bdev_io_complete(bdev_io
, SPDK_BDEV_IO_STATUS_FAILED
);
442 bdev_rbd_io_type_supported(void *ctx
, enum spdk_bdev_io_type io_type
)
445 case SPDK_BDEV_IO_TYPE_READ
:
446 case SPDK_BDEV_IO_TYPE_WRITE
:
447 case SPDK_BDEV_IO_TYPE_FLUSH
:
448 case SPDK_BDEV_IO_TYPE_RESET
:
457 bdev_rbd_io_poll(void *arg
)
459 struct bdev_rbd_io_channel
*ch
= arg
;
460 int i
, io_status
, rc
;
461 rbd_completion_t comps
[SPDK_RBD_QUEUE_DEPTH
];
462 struct spdk_bdev_io
*bdev_io
;
463 struct bdev_rbd_io
*rbd_io
;
465 rc
= poll(&ch
->pfd
, 1, 0);
467 /* check the return value of poll since we have only one fd for each channel */
472 rc
= rbd_poll_io_events(ch
->image
, comps
, SPDK_RBD_QUEUE_DEPTH
);
473 for (i
= 0; i
< rc
; i
++) {
474 bdev_io
= rbd_aio_get_arg(comps
[i
]);
475 rbd_io
= (struct bdev_rbd_io
*)bdev_io
->driver_ctx
;
476 io_status
= rbd_aio_get_return_value(comps
[i
]);
478 assert(rbd_io
->num_segments
> 0);
479 rbd_io
->num_segments
--;
481 if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_READ
) {
483 /* For reads, io_status is the length */
484 rbd_io
->remaining_len
-= io_status
;
487 if (rbd_io
->num_segments
== 0 && rbd_io
->remaining_len
!= 0) {
488 rbd_io
->failed
= true;
491 /* For others, 0 means success */
492 if (io_status
!= 0) {
493 rbd_io
->failed
= true;
497 rbd_aio_release(comps
[i
]);
499 if (rbd_io
->num_segments
== 0) {
500 spdk_bdev_io_complete(bdev_io
,
501 rbd_io
->failed
? SPDK_BDEV_IO_STATUS_FAILED
: SPDK_BDEV_IO_STATUS_SUCCESS
);
509 bdev_rbd_free_channel(struct bdev_rbd_io_channel
*ch
)
516 bdev_rbd_exit(ch
->image
);
520 rados_ioctx_destroy(ch
->io_ctx
);
524 rados_shutdown(ch
->cluster
);
527 if (ch
->pfd
.fd
>= 0) {
533 bdev_rbd_handle(void *arg
)
535 struct bdev_rbd_io_channel
*ch
= arg
;
538 if (rbd_open(ch
->io_ctx
, ch
->disk
->rbd_name
, &ch
->image
, NULL
) < 0) {
539 SPDK_ERRLOG("Failed to open specified rbd device\n");
547 bdev_rbd_create_cb(void *io_device
, void *ctx_buf
)
549 struct bdev_rbd_io_channel
*ch
= ctx_buf
;
552 ch
->disk
= io_device
;
557 ret
= bdev_rados_context_init(ch
->disk
->user_id
, ch
->disk
->pool_name
,
558 (const char *const *)ch
->disk
->config
,
559 &ch
->cluster
, &ch
->io_ctx
);
561 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n",
562 ch
->disk
->user_id
? ch
->disk
->user_id
: "admin (the default)", ch
->disk
->pool_name
);
566 if (spdk_call_unaffinitized(bdev_rbd_handle
, ch
) == NULL
) {
570 ch
->pfd
.fd
= eventfd(0, EFD_NONBLOCK
);
571 if (ch
->pfd
.fd
< 0) {
572 SPDK_ERRLOG("Failed to get eventfd\n");
576 ch
->pfd
.events
= POLLIN
;
577 ret
= rbd_set_image_notification(ch
->image
, ch
->pfd
.fd
, EVENT_TYPE_EVENTFD
);
579 SPDK_ERRLOG("Failed to set rbd image notification\n");
583 ch
->poller
= spdk_poller_register(bdev_rbd_io_poll
, ch
, BDEV_RBD_POLL_US
);
588 bdev_rbd_free_channel(ch
);
593 bdev_rbd_destroy_cb(void *io_device
, void *ctx_buf
)
595 struct bdev_rbd_io_channel
*io_channel
= ctx_buf
;
597 bdev_rbd_free_channel(io_channel
);
599 spdk_poller_unregister(&io_channel
->poller
);
602 static struct spdk_io_channel
*
603 bdev_rbd_get_io_channel(void *ctx
)
605 struct bdev_rbd
*rbd_bdev
= ctx
;
607 return spdk_get_io_channel(rbd_bdev
);
611 bdev_rbd_dump_info_json(void *ctx
, struct spdk_json_write_ctx
*w
)
613 struct bdev_rbd
*rbd_bdev
= ctx
;
615 spdk_json_write_named_object_begin(w
, "rbd");
617 spdk_json_write_named_string(w
, "pool_name", rbd_bdev
->pool_name
);
619 spdk_json_write_named_string(w
, "rbd_name", rbd_bdev
->rbd_name
);
621 if (rbd_bdev
->user_id
) {
622 spdk_json_write_named_string(w
, "user_id", rbd_bdev
->user_id
);
625 if (rbd_bdev
->config
) {
626 char **entry
= rbd_bdev
->config
;
628 spdk_json_write_named_object_begin(w
, "config");
630 spdk_json_write_named_string(w
, entry
[0], entry
[1]);
633 spdk_json_write_object_end(w
);
636 spdk_json_write_object_end(w
);
642 bdev_rbd_write_config_json(struct spdk_bdev
*bdev
, struct spdk_json_write_ctx
*w
)
644 struct bdev_rbd
*rbd
= bdev
->ctxt
;
646 spdk_json_write_object_begin(w
);
648 spdk_json_write_named_string(w
, "method", "construct_rbd_bdev");
650 spdk_json_write_named_object_begin(w
, "params");
651 spdk_json_write_named_string(w
, "name", bdev
->name
);
652 spdk_json_write_named_string(w
, "pool_name", rbd
->pool_name
);
653 spdk_json_write_named_string(w
, "rbd_name", rbd
->rbd_name
);
654 spdk_json_write_named_uint32(w
, "block_size", bdev
->blocklen
);
656 spdk_json_write_named_string(w
, "user_id", rbd
->user_id
);
660 char **entry
= rbd
->config
;
662 spdk_json_write_named_object_begin(w
, "config");
664 spdk_json_write_named_string(w
, entry
[0], entry
[1]);
667 spdk_json_write_object_end(w
);
670 spdk_json_write_object_end(w
);
672 spdk_json_write_object_end(w
);
675 static const struct spdk_bdev_fn_table rbd_fn_table
= {
676 .destruct
= bdev_rbd_destruct
,
677 .submit_request
= bdev_rbd_submit_request
,
678 .io_type_supported
= bdev_rbd_io_type_supported
,
679 .get_io_channel
= bdev_rbd_get_io_channel
,
680 .dump_info_json
= bdev_rbd_dump_info_json
,
681 .write_config_json
= bdev_rbd_write_config_json
,
685 spdk_bdev_rbd_create(const char *name
, const char *user_id
, const char *pool_name
,
686 const char *const *config
,
687 const char *rbd_name
,
690 struct bdev_rbd
*rbd
;
693 if ((pool_name
== NULL
) || (rbd_name
== NULL
)) {
697 rbd
= calloc(1, sizeof(struct bdev_rbd
));
699 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
703 rbd
->rbd_name
= strdup(rbd_name
);
704 if (!rbd
->rbd_name
) {
710 rbd
->user_id
= strdup(user_id
);
717 rbd
->pool_name
= strdup(pool_name
);
718 if (!rbd
->pool_name
) {
723 if (config
&& !(rbd
->config
= spdk_bdev_rbd_dup_config(config
))) {
728 ret
= bdev_rbd_init(rbd
->user_id
, rbd
->pool_name
,
729 (const char *const *)rbd
->config
,
730 rbd_name
, &rbd
->info
);
733 SPDK_ERRLOG("Failed to init rbd device\n");
738 rbd
->disk
.name
= strdup(name
);
740 rbd
->disk
.name
= spdk_sprintf_alloc("Ceph%d", bdev_rbd_count
);
742 if (!rbd
->disk
.name
) {
746 rbd
->disk
.product_name
= "Ceph Rbd Disk";
749 rbd
->disk
.write_cache
= 0;
750 rbd
->disk
.blocklen
= block_size
;
751 rbd
->disk
.blockcnt
= rbd
->info
.size
/ rbd
->disk
.blocklen
;
752 rbd
->disk
.ctxt
= rbd
;
753 rbd
->disk
.fn_table
= &rbd_fn_table
;
754 rbd
->disk
.module
= &rbd_if
;
756 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd
->disk
.name
);
758 spdk_io_device_register(rbd
, bdev_rbd_create_cb
,
760 sizeof(struct bdev_rbd_io_channel
),
762 ret
= spdk_bdev_register(&rbd
->disk
);
764 spdk_io_device_unregister(rbd
, NULL
);
773 spdk_bdev_rbd_delete(struct spdk_bdev
*bdev
, spdk_delete_rbd_complete cb_fn
, void *cb_arg
)
775 if (!bdev
|| bdev
->module
!= &rbd_if
) {
776 cb_fn(cb_arg
, -ENODEV
);
780 spdk_bdev_unregister(bdev
, cb_fn
, cb_arg
);
784 bdev_rbd_library_init(void)
788 const char *pool_name
;
789 const char *rbd_name
;
793 struct spdk_conf_section
*sp
= spdk_conf_find_section(NULL
, "Ceph");
797 * Ceph section not found. Do not initialize any rbd LUNS.
802 /* Init rbd block devices */
804 val
= spdk_conf_section_get_nval(sp
, "Ceph", i
);
809 /* get the Rbd_pool name */
810 pool_name
= spdk_conf_section_get_nmval(sp
, "Ceph", i
, 0);
811 if (pool_name
== NULL
) {
812 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i
);
817 rbd_name
= spdk_conf_section_get_nmval(sp
, "Ceph", i
, 1);
818 if (rbd_name
== NULL
) {
819 SPDK_ERRLOG("Ceph%d: format error\n", i
);
824 val
= spdk_conf_section_get_nmval(sp
, "Ceph", i
, 2);
827 block_size
= 512; /* default value */
829 tmp
= spdk_strtol(val
, 10);
831 SPDK_ERRLOG("Invalid block size\n");
834 } else if (tmp
& 0x1ff) {
835 SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n",
840 block_size
= (uint32_t)tmp
;
843 /* TODO(?): user_id and rbd config values */
844 if (spdk_bdev_rbd_create(NULL
, NULL
, pool_name
, NULL
, rbd_name
, block_size
) == NULL
) {
854 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD
)