4 * Copyright (c) Intel Corporation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk/stdinc.h"
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
42 #include "spdk/conf.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
53 #define SPDK_RBD_QUEUE_DEPTH 128
55 static int bdev_rbd_count
= 0;
57 #define BDEV_RBD_POLL_US 50
60 struct spdk_bdev disk
;
63 rbd_image_info_t info
;
64 TAILQ_ENTRY(bdev_rbd
) tailq
;
65 struct spdk_poller
*reset_timer
;
66 struct spdk_bdev_io
*reset_bdev_io
;
69 struct bdev_rbd_io_channel
{
74 struct bdev_rbd
*disk
;
75 struct spdk_poller
*poller
;
79 uint64_t remaining_len
;
85 bdev_rbd_free(struct bdev_rbd
*rbd
)
98 bdev_rados_context_init(const char *rbd_pool_name
, rados_t
*cluster
,
99 rados_ioctx_t
*io_ctx
)
103 ret
= rados_create(cluster
, NULL
);
105 SPDK_ERRLOG("Failed to create rados_t struct\n");
109 ret
= rados_conf_read_file(*cluster
, NULL
);
111 SPDK_ERRLOG("Failed to read conf file\n");
112 rados_shutdown(*cluster
);
116 ret
= rados_connect(*cluster
);
118 SPDK_ERRLOG("Failed to connect to rbd_pool\n");
119 rados_shutdown(*cluster
);
123 ret
= rados_ioctx_create(*cluster
, rbd_pool_name
, io_ctx
);
126 SPDK_ERRLOG("Failed to create ioctx\n");
127 rados_shutdown(*cluster
);
135 bdev_rbd_init(const char *rbd_pool_name
, const char *rbd_name
, rbd_image_info_t
*info
)
138 rados_t cluster
= NULL
;
139 rados_ioctx_t io_ctx
= NULL
;
140 rbd_image_t image
= NULL
;
142 ret
= bdev_rados_context_init(rbd_pool_name
, &cluster
, &io_ctx
);
144 SPDK_ERRLOG("Failed to create rados context for rbd_pool=%s\n",
149 ret
= rbd_open(io_ctx
, rbd_name
, &image
, NULL
);
151 SPDK_ERRLOG("Failed to open specified rbd device\n");
154 ret
= rbd_stat(image
, info
, sizeof(*info
));
157 SPDK_ERRLOG("Failed to stat specified rbd device\n");
161 rados_ioctx_destroy(io_ctx
);
164 rados_ioctx_destroy(io_ctx
);
165 rados_shutdown(cluster
);
170 bdev_rbd_exit(rbd_image_t image
)
177 bdev_rbd_finish_aiocb(rbd_completion_t cb
, void *arg
)
179 /* Doing nothing here */
183 bdev_rbd_start_aio(rbd_image_t image
, struct spdk_bdev_io
*bdev_io
,
184 void *buf
, uint64_t offset
, size_t len
)
187 rbd_completion_t comp
;
189 ret
= rbd_aio_create_completion(bdev_io
, bdev_rbd_finish_aiocb
,
195 if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_READ
) {
196 ret
= rbd_aio_read(image
, offset
, len
,
198 } else if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_WRITE
) {
199 ret
= rbd_aio_write(image
, offset
, len
,
201 } else if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_FLUSH
) {
202 ret
= rbd_aio_flush(image
, comp
);
206 rbd_aio_release(comp
);
213 static int bdev_rbd_library_init(void);
216 bdev_rbd_get_ctx_size(void)
218 return sizeof(struct bdev_rbd_io
);
221 static struct spdk_bdev_module rbd_if
= {
223 .module_init
= bdev_rbd_library_init
,
224 .get_ctx_size
= bdev_rbd_get_ctx_size
,
227 SPDK_BDEV_MODULE_REGISTER(&rbd_if
)
230 bdev_rbd_rw(struct bdev_rbd
*disk
, struct spdk_io_channel
*ch
,
231 struct spdk_bdev_io
*bdev_io
, struct iovec
*iov
,
232 int iovcnt
, size_t len
, uint64_t offset
)
234 struct bdev_rbd_io
*rbd_io
= (struct bdev_rbd_io
*)bdev_io
->driver_ctx
;
235 struct bdev_rbd_io_channel
*rbdio_ch
= spdk_io_channel_get_ctx(ch
);
236 size_t remaining
= len
;
239 rbd_io
->remaining_len
= 0;
240 rbd_io
->num_segments
= 0;
241 rbd_io
->failed
= false;
243 for (i
= 0; i
< iovcnt
&& remaining
> 0; i
++) {
244 size_t seg_len
= spdk_min(remaining
, iov
[i
].iov_len
);
246 rc
= bdev_rbd_start_aio(rbdio_ch
->image
, bdev_io
, iov
[i
].iov_base
, offset
, seg_len
);
249 * This bdev_rbd_start_aio() call failed, but if any previous ones were
250 * submitted, we need to wait for them to finish.
252 if (rbd_io
->num_segments
== 0) {
253 /* No previous I/O submitted - return error code immediately. */
257 /* Return and wait for outstanding I/O to complete. */
258 rbd_io
->failed
= true;
262 rbd_io
->num_segments
++;
263 rbd_io
->remaining_len
+= seg_len
;
266 remaining
-= seg_len
;
273 bdev_rbd_flush(struct bdev_rbd
*disk
, struct spdk_io_channel
*ch
,
274 struct spdk_bdev_io
*bdev_io
, uint64_t offset
, uint64_t nbytes
)
276 struct bdev_rbd_io_channel
*rbdio_ch
= spdk_io_channel_get_ctx(ch
);
278 return bdev_rbd_start_aio(rbdio_ch
->image
, bdev_io
, NULL
, offset
, nbytes
);
282 bdev_rbd_reset_timer(void *arg
)
284 struct bdev_rbd
*disk
= arg
;
287 * TODO: This should check if any I/O is still in flight before completing the reset.
288 * For now, just complete after the timer expires.
290 spdk_bdev_io_complete(disk
->reset_bdev_io
, SPDK_BDEV_IO_STATUS_SUCCESS
);
291 spdk_poller_unregister(&disk
->reset_timer
);
292 disk
->reset_bdev_io
= NULL
;
298 bdev_rbd_reset(struct bdev_rbd
*disk
, struct spdk_bdev_io
*bdev_io
)
301 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
302 * timer to wait for in-flight I/O to complete.
304 assert(disk
->reset_bdev_io
== NULL
);
305 disk
->reset_bdev_io
= bdev_io
;
306 disk
->reset_timer
= spdk_poller_register(bdev_rbd_reset_timer
, disk
, 1 * 1000 * 1000);
312 bdev_rbd_destruct(void *ctx
)
314 struct bdev_rbd
*rbd
= ctx
;
316 spdk_io_device_unregister(rbd
, NULL
);
322 static void bdev_rbd_get_buf_cb(struct spdk_io_channel
*ch
, struct spdk_bdev_io
*bdev_io
)
326 ret
= bdev_rbd_rw(bdev_io
->bdev
->ctxt
,
329 bdev_io
->u
.bdev
.iovs
,
330 bdev_io
->u
.bdev
.iovcnt
,
331 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
,
332 bdev_io
->u
.bdev
.offset_blocks
* bdev_io
->bdev
->blocklen
);
335 spdk_bdev_io_complete(bdev_io
, SPDK_BDEV_IO_STATUS_FAILED
);
339 static int _bdev_rbd_submit_request(struct spdk_io_channel
*ch
, struct spdk_bdev_io
*bdev_io
)
341 switch (bdev_io
->type
) {
342 case SPDK_BDEV_IO_TYPE_READ
:
343 spdk_bdev_io_get_buf(bdev_io
, bdev_rbd_get_buf_cb
,
344 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
);
347 case SPDK_BDEV_IO_TYPE_WRITE
:
348 return bdev_rbd_rw((struct bdev_rbd
*)bdev_io
->bdev
->ctxt
,
351 bdev_io
->u
.bdev
.iovs
,
352 bdev_io
->u
.bdev
.iovcnt
,
353 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
,
354 bdev_io
->u
.bdev
.offset_blocks
* bdev_io
->bdev
->blocklen
);
356 case SPDK_BDEV_IO_TYPE_FLUSH
:
357 return bdev_rbd_flush((struct bdev_rbd
*)bdev_io
->bdev
->ctxt
,
360 bdev_io
->u
.bdev
.offset_blocks
* bdev_io
->bdev
->blocklen
,
361 bdev_io
->u
.bdev
.num_blocks
* bdev_io
->bdev
->blocklen
);
363 case SPDK_BDEV_IO_TYPE_RESET
:
364 return bdev_rbd_reset((struct bdev_rbd
*)bdev_io
->bdev
->ctxt
,
373 static void bdev_rbd_submit_request(struct spdk_io_channel
*ch
, struct spdk_bdev_io
*bdev_io
)
375 if (_bdev_rbd_submit_request(ch
, bdev_io
) < 0) {
376 spdk_bdev_io_complete(bdev_io
, SPDK_BDEV_IO_STATUS_FAILED
);
381 bdev_rbd_io_type_supported(void *ctx
, enum spdk_bdev_io_type io_type
)
384 case SPDK_BDEV_IO_TYPE_READ
:
385 case SPDK_BDEV_IO_TYPE_WRITE
:
386 case SPDK_BDEV_IO_TYPE_FLUSH
:
387 case SPDK_BDEV_IO_TYPE_RESET
:
396 bdev_rbd_io_poll(void *arg
)
398 struct bdev_rbd_io_channel
*ch
= arg
;
399 int i
, io_status
, rc
;
400 rbd_completion_t comps
[SPDK_RBD_QUEUE_DEPTH
];
401 struct spdk_bdev_io
*bdev_io
;
402 struct bdev_rbd_io
*rbd_io
;
404 rc
= poll(&ch
->pfd
, 1, 0);
406 /* check the return value of poll since we have only one fd for each channel */
411 rc
= rbd_poll_io_events(ch
->image
, comps
, SPDK_RBD_QUEUE_DEPTH
);
412 for (i
= 0; i
< rc
; i
++) {
413 bdev_io
= rbd_aio_get_arg(comps
[i
]);
414 rbd_io
= (struct bdev_rbd_io
*)bdev_io
->driver_ctx
;
415 io_status
= rbd_aio_get_return_value(comps
[i
]);
417 assert(rbd_io
->num_segments
> 0);
418 rbd_io
->num_segments
--;
420 if (bdev_io
->type
== SPDK_BDEV_IO_TYPE_READ
) {
422 /* For reads, io_status is the length */
423 rbd_io
->remaining_len
-= io_status
;
426 if (rbd_io
->num_segments
== 0 && rbd_io
->remaining_len
!= 0) {
427 rbd_io
->failed
= true;
430 /* For others, 0 means success */
431 if (io_status
!= 0) {
432 rbd_io
->failed
= true;
436 rbd_aio_release(comps
[i
]);
438 if (rbd_io
->num_segments
== 0) {
439 spdk_bdev_io_complete(bdev_io
,
440 rbd_io
->failed
? SPDK_BDEV_IO_STATUS_FAILED
: SPDK_BDEV_IO_STATUS_SUCCESS
);
448 bdev_rbd_free_channel(struct bdev_rbd_io_channel
*ch
)
455 bdev_rbd_exit(ch
->image
);
459 rados_ioctx_destroy(ch
->io_ctx
);
463 rados_shutdown(ch
->cluster
);
466 if (ch
->pfd
.fd
>= 0) {
472 bdev_rbd_handle(void *arg
)
474 struct bdev_rbd_io_channel
*ch
= arg
;
477 if (rbd_open(ch
->io_ctx
, ch
->disk
->rbd_name
, &ch
->image
, NULL
) < 0) {
478 SPDK_ERRLOG("Failed to open specified rbd device\n");
486 bdev_rbd_create_cb(void *io_device
, void *ctx_buf
)
488 struct bdev_rbd_io_channel
*ch
= ctx_buf
;
491 ch
->disk
= io_device
;
496 ret
= bdev_rados_context_init(ch
->disk
->pool_name
, &ch
->cluster
, &ch
->io_ctx
);
498 SPDK_ERRLOG("Failed to create rados context for rbd_pool=%s\n",
499 ch
->disk
->pool_name
);
503 if (spdk_call_unaffinitized(bdev_rbd_handle
, ch
) == NULL
) {
507 ch
->pfd
.fd
= eventfd(0, EFD_NONBLOCK
);
508 if (ch
->pfd
.fd
< 0) {
509 SPDK_ERRLOG("Failed to get eventfd\n");
513 ch
->pfd
.events
= POLLIN
;
514 ret
= rbd_set_image_notification(ch
->image
, ch
->pfd
.fd
, EVENT_TYPE_EVENTFD
);
516 SPDK_ERRLOG("Failed to set rbd image notification\n");
520 ch
->poller
= spdk_poller_register(bdev_rbd_io_poll
, ch
, BDEV_RBD_POLL_US
);
525 bdev_rbd_free_channel(ch
);
530 bdev_rbd_destroy_cb(void *io_device
, void *ctx_buf
)
532 struct bdev_rbd_io_channel
*io_channel
= ctx_buf
;
534 bdev_rbd_free_channel(io_channel
);
536 spdk_poller_unregister(&io_channel
->poller
);
539 static struct spdk_io_channel
*
540 bdev_rbd_get_io_channel(void *ctx
)
542 struct bdev_rbd
*rbd_bdev
= ctx
;
544 return spdk_get_io_channel(rbd_bdev
);
548 bdev_rbd_dump_info_json(void *ctx
, struct spdk_json_write_ctx
*w
)
550 struct bdev_rbd
*rbd_bdev
= ctx
;
552 spdk_json_write_name(w
, "rbd");
553 spdk_json_write_object_begin(w
);
555 spdk_json_write_name(w
, "pool_name");
556 spdk_json_write_string(w
, rbd_bdev
->pool_name
);
558 spdk_json_write_name(w
, "rbd_name");
559 spdk_json_write_string(w
, rbd_bdev
->rbd_name
);
561 spdk_json_write_object_end(w
);
567 bdev_rbd_write_config_json(struct spdk_bdev
*bdev
, struct spdk_json_write_ctx
*w
)
569 struct bdev_rbd
*rbd
= bdev
->ctxt
;
571 spdk_json_write_object_begin(w
);
573 spdk_json_write_named_string(w
, "method", "construct_rbd_bdev");
575 spdk_json_write_named_object_begin(w
, "params");
576 spdk_json_write_named_string(w
, "name", bdev
->name
);
577 spdk_json_write_named_string(w
, "pool_name", rbd
->pool_name
);
578 spdk_json_write_named_string(w
, "rbd_name", rbd
->rbd_name
);
579 spdk_json_write_named_uint32(w
, "block_size", bdev
->blocklen
);
580 spdk_json_write_object_end(w
);
582 spdk_json_write_object_end(w
);
585 static const struct spdk_bdev_fn_table rbd_fn_table
= {
586 .destruct
= bdev_rbd_destruct
,
587 .submit_request
= bdev_rbd_submit_request
,
588 .io_type_supported
= bdev_rbd_io_type_supported
,
589 .get_io_channel
= bdev_rbd_get_io_channel
,
590 .dump_info_json
= bdev_rbd_dump_info_json
,
591 .write_config_json
= bdev_rbd_write_config_json
,
595 spdk_bdev_rbd_create(const char *name
, const char *pool_name
, const char *rbd_name
,
598 struct bdev_rbd
*rbd
;
601 if ((pool_name
== NULL
) || (rbd_name
== NULL
)) {
605 rbd
= calloc(1, sizeof(struct bdev_rbd
));
607 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
611 rbd
->rbd_name
= strdup(rbd_name
);
612 if (!rbd
->rbd_name
) {
617 rbd
->pool_name
= strdup(pool_name
);
618 if (!rbd
->pool_name
) {
623 ret
= bdev_rbd_init(rbd
->pool_name
, rbd_name
, &rbd
->info
);
626 SPDK_ERRLOG("Failed to init rbd device\n");
631 rbd
->disk
.name
= strdup(name
);
633 rbd
->disk
.name
= spdk_sprintf_alloc("Ceph%d", bdev_rbd_count
);
635 if (!rbd
->disk
.name
) {
639 rbd
->disk
.product_name
= "Ceph Rbd Disk";
642 rbd
->disk
.write_cache
= 0;
643 rbd
->disk
.blocklen
= block_size
;
644 rbd
->disk
.blockcnt
= rbd
->info
.size
/ rbd
->disk
.blocklen
;
645 rbd
->disk
.ctxt
= rbd
;
646 rbd
->disk
.fn_table
= &rbd_fn_table
;
647 rbd
->disk
.module
= &rbd_if
;
649 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd
->disk
.name
);
651 spdk_io_device_register(rbd
, bdev_rbd_create_cb
,
653 sizeof(struct bdev_rbd_io_channel
),
655 ret
= spdk_bdev_register(&rbd
->disk
);
657 spdk_io_device_unregister(rbd
, NULL
);
666 spdk_bdev_rbd_delete(struct spdk_bdev
*bdev
, spdk_delete_rbd_complete cb_fn
, void *cb_arg
)
668 if (!bdev
|| bdev
->module
!= &rbd_if
) {
669 cb_fn(cb_arg
, -ENODEV
);
673 spdk_bdev_unregister(bdev
, cb_fn
, cb_arg
);
677 bdev_rbd_library_init(void)
681 const char *pool_name
;
682 const char *rbd_name
;
685 struct spdk_conf_section
*sp
= spdk_conf_find_section(NULL
, "Ceph");
689 * Ceph section not found. Do not initialize any rbd LUNS.
694 /* Init rbd block devices */
696 val
= spdk_conf_section_get_nval(sp
, "Ceph", i
);
701 /* get the Rbd_pool name */
702 pool_name
= spdk_conf_section_get_nmval(sp
, "Ceph", i
, 0);
703 if (pool_name
== NULL
) {
704 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i
);
709 rbd_name
= spdk_conf_section_get_nmval(sp
, "Ceph", i
, 1);
710 if (rbd_name
== NULL
) {
711 SPDK_ERRLOG("Ceph%d: format error\n", i
);
716 val
= spdk_conf_section_get_nmval(sp
, "Ceph", i
, 2);
719 block_size
= 512; /* default value */
721 block_size
= (int)strtol(val
, NULL
, 10);
722 if (block_size
& 0x1ff) {
723 SPDK_ERRLOG("current block_size = %d, it should be multiple of 512\n",
730 if (spdk_bdev_rbd_create(NULL
, pool_name
, rbd_name
, block_size
) == NULL
) {
740 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD
)