]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/bdev/rbd/bdev_rbd.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / lib / bdev / rbd / bdev_rbd.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35
36 #include "bdev_rbd.h"
37
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41
42 #include "spdk/conf.h"
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
52
53 #define SPDK_RBD_QUEUE_DEPTH 128
54
55 static int bdev_rbd_count = 0;
56
57 #define BDEV_RBD_POLL_US 50
58
59 struct bdev_rbd {
60 struct spdk_bdev disk;
61 char *rbd_name;
62 char *pool_name;
63 rbd_image_info_t info;
64 TAILQ_ENTRY(bdev_rbd) tailq;
65 struct spdk_poller *reset_timer;
66 struct spdk_bdev_io *reset_bdev_io;
67 };
68
69 struct bdev_rbd_io_channel {
70 rados_ioctx_t io_ctx;
71 rados_t cluster;
72 struct pollfd pfd;
73 rbd_image_t image;
74 struct bdev_rbd *disk;
75 struct spdk_poller *poller;
76 };
77
78 struct bdev_rbd_io {
79 uint64_t remaining_len;
80 int num_segments;
81 bool failed;
82 };
83
84 static void
85 bdev_rbd_free(struct bdev_rbd *rbd)
86 {
87 if (!rbd) {
88 return;
89 }
90
91 free(rbd->disk.name);
92 free(rbd->rbd_name);
93 free(rbd->pool_name);
94 free(rbd);
95 }
96
97 static int
98 bdev_rados_context_init(const char *rbd_pool_name, rados_t *cluster,
99 rados_ioctx_t *io_ctx)
100 {
101 int ret;
102
103 ret = rados_create(cluster, NULL);
104 if (ret < 0) {
105 SPDK_ERRLOG("Failed to create rados_t struct\n");
106 return -1;
107 }
108
109 ret = rados_conf_read_file(*cluster, NULL);
110 if (ret < 0) {
111 SPDK_ERRLOG("Failed to read conf file\n");
112 rados_shutdown(*cluster);
113 return -1;
114 }
115
116 ret = rados_connect(*cluster);
117 if (ret < 0) {
118 SPDK_ERRLOG("Failed to connect to rbd_pool\n");
119 rados_shutdown(*cluster);
120 return -1;
121 }
122
123 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx);
124
125 if (ret < 0) {
126 SPDK_ERRLOG("Failed to create ioctx\n");
127 rados_shutdown(*cluster);
128 return -1;
129 }
130
131 return 0;
132 }
133
134 static int
135 bdev_rbd_init(const char *rbd_pool_name, const char *rbd_name, rbd_image_info_t *info)
136 {
137 int ret;
138 rados_t cluster = NULL;
139 rados_ioctx_t io_ctx = NULL;
140 rbd_image_t image = NULL;
141
142 ret = bdev_rados_context_init(rbd_pool_name, &cluster, &io_ctx);
143 if (ret < 0) {
144 SPDK_ERRLOG("Failed to create rados context for rbd_pool=%s\n",
145 rbd_pool_name);
146 return -1;
147 }
148
149 ret = rbd_open(io_ctx, rbd_name, &image, NULL);
150 if (ret < 0) {
151 SPDK_ERRLOG("Failed to open specified rbd device\n");
152 goto err;
153 }
154 ret = rbd_stat(image, info, sizeof(*info));
155 rbd_close(image);
156 if (ret < 0) {
157 SPDK_ERRLOG("Failed to stat specified rbd device\n");
158 goto err;
159 }
160
161 rados_ioctx_destroy(io_ctx);
162 return 0;
163 err:
164 rados_ioctx_destroy(io_ctx);
165 rados_shutdown(cluster);
166 return -1;
167 }
168
169 static void
170 bdev_rbd_exit(rbd_image_t image)
171 {
172 rbd_flush(image);
173 rbd_close(image);
174 }
175
176 static void
177 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
178 {
179 /* Doing nothing here */
180 }
181
182 static int
183 bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io,
184 void *buf, uint64_t offset, size_t len)
185 {
186 int ret;
187 rbd_completion_t comp;
188
189 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
190 &comp);
191 if (ret < 0) {
192 return -1;
193 }
194
195 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
196 ret = rbd_aio_read(image, offset, len,
197 buf, comp);
198 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
199 ret = rbd_aio_write(image, offset, len,
200 buf, comp);
201 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
202 ret = rbd_aio_flush(image, comp);
203 }
204
205 if (ret < 0) {
206 rbd_aio_release(comp);
207 return -1;
208 }
209
210 return 0;
211 }
212
213 static int bdev_rbd_library_init(void);
214
215 static int
216 bdev_rbd_get_ctx_size(void)
217 {
218 return sizeof(struct bdev_rbd_io);
219 }
220
221 static struct spdk_bdev_module rbd_if = {
222 .name = "rbd",
223 .module_init = bdev_rbd_library_init,
224 .get_ctx_size = bdev_rbd_get_ctx_size,
225
226 };
227 SPDK_BDEV_MODULE_REGISTER(&rbd_if)
228
229 static int64_t
230 bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch,
231 struct spdk_bdev_io *bdev_io, struct iovec *iov,
232 int iovcnt, size_t len, uint64_t offset)
233 {
234 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
235 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
236 size_t remaining = len;
237 int i, rc;
238
239 rbd_io->remaining_len = 0;
240 rbd_io->num_segments = 0;
241 rbd_io->failed = false;
242
243 for (i = 0; i < iovcnt && remaining > 0; i++) {
244 size_t seg_len = spdk_min(remaining, iov[i].iov_len);
245
246 rc = bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov[i].iov_base, offset, seg_len);
247 if (rc) {
248 /*
249 * This bdev_rbd_start_aio() call failed, but if any previous ones were
250 * submitted, we need to wait for them to finish.
251 */
252 if (rbd_io->num_segments == 0) {
253 /* No previous I/O submitted - return error code immediately. */
254 return rc;
255 }
256
257 /* Return and wait for outstanding I/O to complete. */
258 rbd_io->failed = true;
259 return 0;
260 }
261
262 rbd_io->num_segments++;
263 rbd_io->remaining_len += seg_len;
264
265 offset += seg_len;
266 remaining -= seg_len;
267 }
268
269 return 0;
270 }
271
272 static int64_t
273 bdev_rbd_flush(struct bdev_rbd *disk, struct spdk_io_channel *ch,
274 struct spdk_bdev_io *bdev_io, uint64_t offset, uint64_t nbytes)
275 {
276 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
277
278 return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, NULL, offset, nbytes);
279 }
280
281 static int
282 bdev_rbd_reset_timer(void *arg)
283 {
284 struct bdev_rbd *disk = arg;
285
286 /*
287 * TODO: This should check if any I/O is still in flight before completing the reset.
288 * For now, just complete after the timer expires.
289 */
290 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
291 spdk_poller_unregister(&disk->reset_timer);
292 disk->reset_bdev_io = NULL;
293
294 return -1;
295 }
296
297 static int
298 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
299 {
300 /*
301 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
302 * timer to wait for in-flight I/O to complete.
303 */
304 assert(disk->reset_bdev_io == NULL);
305 disk->reset_bdev_io = bdev_io;
306 disk->reset_timer = spdk_poller_register(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
307
308 return 0;
309 }
310
311 static int
312 bdev_rbd_destruct(void *ctx)
313 {
314 struct bdev_rbd *rbd = ctx;
315
316 spdk_io_device_unregister(rbd, NULL);
317
318 bdev_rbd_free(rbd);
319 return 0;
320 }
321
322 static void bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
323 {
324 int ret;
325
326 ret = bdev_rbd_rw(bdev_io->bdev->ctxt,
327 ch,
328 bdev_io,
329 bdev_io->u.bdev.iovs,
330 bdev_io->u.bdev.iovcnt,
331 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
332 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
333
334 if (ret != 0) {
335 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
336 }
337 }
338
339 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
340 {
341 switch (bdev_io->type) {
342 case SPDK_BDEV_IO_TYPE_READ:
343 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
344 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
345 return 0;
346
347 case SPDK_BDEV_IO_TYPE_WRITE:
348 return bdev_rbd_rw((struct bdev_rbd *)bdev_io->bdev->ctxt,
349 ch,
350 bdev_io,
351 bdev_io->u.bdev.iovs,
352 bdev_io->u.bdev.iovcnt,
353 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
354 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
355
356 case SPDK_BDEV_IO_TYPE_FLUSH:
357 return bdev_rbd_flush((struct bdev_rbd *)bdev_io->bdev->ctxt,
358 ch,
359 bdev_io,
360 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
361 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
362
363 case SPDK_BDEV_IO_TYPE_RESET:
364 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
365 bdev_io);
366
367 default:
368 return -1;
369 }
370 return 0;
371 }
372
373 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
374 {
375 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) {
376 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
377 }
378 }
379
380 static bool
381 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
382 {
383 switch (io_type) {
384 case SPDK_BDEV_IO_TYPE_READ:
385 case SPDK_BDEV_IO_TYPE_WRITE:
386 case SPDK_BDEV_IO_TYPE_FLUSH:
387 case SPDK_BDEV_IO_TYPE_RESET:
388 return true;
389
390 default:
391 return false;
392 }
393 }
394
395 static int
396 bdev_rbd_io_poll(void *arg)
397 {
398 struct bdev_rbd_io_channel *ch = arg;
399 int i, io_status, rc;
400 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
401 struct spdk_bdev_io *bdev_io;
402 struct bdev_rbd_io *rbd_io;
403
404 rc = poll(&ch->pfd, 1, 0);
405
406 /* check the return value of poll since we have only one fd for each channel */
407 if (rc != 1) {
408 return 0;
409 }
410
411 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
412 for (i = 0; i < rc; i++) {
413 bdev_io = rbd_aio_get_arg(comps[i]);
414 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
415 io_status = rbd_aio_get_return_value(comps[i]);
416
417 assert(rbd_io->num_segments > 0);
418 rbd_io->num_segments--;
419
420 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
421 if (io_status > 0) {
422 /* For reads, io_status is the length */
423 rbd_io->remaining_len -= io_status;
424 }
425
426 if (rbd_io->num_segments == 0 && rbd_io->remaining_len != 0) {
427 rbd_io->failed = true;
428 }
429 } else {
430 /* For others, 0 means success */
431 if (io_status != 0) {
432 rbd_io->failed = true;
433 }
434 }
435
436 rbd_aio_release(comps[i]);
437
438 if (rbd_io->num_segments == 0) {
439 spdk_bdev_io_complete(bdev_io,
440 rbd_io->failed ? SPDK_BDEV_IO_STATUS_FAILED : SPDK_BDEV_IO_STATUS_SUCCESS);
441 }
442 }
443
444 return rc;
445 }
446
447 static void
448 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
449 {
450 if (!ch) {
451 return;
452 }
453
454 if (ch->image) {
455 bdev_rbd_exit(ch->image);
456 }
457
458 if (ch->io_ctx) {
459 rados_ioctx_destroy(ch->io_ctx);
460 }
461
462 if (ch->cluster) {
463 rados_shutdown(ch->cluster);
464 }
465
466 if (ch->pfd.fd >= 0) {
467 close(ch->pfd.fd);
468 }
469 }
470
471 static void *
472 bdev_rbd_handle(void *arg)
473 {
474 struct bdev_rbd_io_channel *ch = arg;
475 void *ret = arg;
476
477 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
478 SPDK_ERRLOG("Failed to open specified rbd device\n");
479 ret = NULL;
480 }
481
482 return ret;
483 }
484
485 static int
486 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
487 {
488 struct bdev_rbd_io_channel *ch = ctx_buf;
489 int ret;
490
491 ch->disk = io_device;
492 ch->image = NULL;
493 ch->io_ctx = NULL;
494 ch->pfd.fd = -1;
495
496 ret = bdev_rados_context_init(ch->disk->pool_name, &ch->cluster, &ch->io_ctx);
497 if (ret < 0) {
498 SPDK_ERRLOG("Failed to create rados context for rbd_pool=%s\n",
499 ch->disk->pool_name);
500 goto err;
501 }
502
503 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
504 goto err;
505 }
506
507 ch->pfd.fd = eventfd(0, EFD_NONBLOCK);
508 if (ch->pfd.fd < 0) {
509 SPDK_ERRLOG("Failed to get eventfd\n");
510 goto err;
511 }
512
513 ch->pfd.events = POLLIN;
514 ret = rbd_set_image_notification(ch->image, ch->pfd.fd, EVENT_TYPE_EVENTFD);
515 if (ret < 0) {
516 SPDK_ERRLOG("Failed to set rbd image notification\n");
517 goto err;
518 }
519
520 ch->poller = spdk_poller_register(bdev_rbd_io_poll, ch, BDEV_RBD_POLL_US);
521
522 return 0;
523
524 err:
525 bdev_rbd_free_channel(ch);
526 return -1;
527 }
528
529 static void
530 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
531 {
532 struct bdev_rbd_io_channel *io_channel = ctx_buf;
533
534 bdev_rbd_free_channel(io_channel);
535
536 spdk_poller_unregister(&io_channel->poller);
537 }
538
539 static struct spdk_io_channel *
540 bdev_rbd_get_io_channel(void *ctx)
541 {
542 struct bdev_rbd *rbd_bdev = ctx;
543
544 return spdk_get_io_channel(rbd_bdev);
545 }
546
547 static int
548 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
549 {
550 struct bdev_rbd *rbd_bdev = ctx;
551
552 spdk_json_write_name(w, "rbd");
553 spdk_json_write_object_begin(w);
554
555 spdk_json_write_name(w, "pool_name");
556 spdk_json_write_string(w, rbd_bdev->pool_name);
557
558 spdk_json_write_name(w, "rbd_name");
559 spdk_json_write_string(w, rbd_bdev->rbd_name);
560
561 spdk_json_write_object_end(w);
562
563 return 0;
564 }
565
566 static void
567 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
568 {
569 struct bdev_rbd *rbd = bdev->ctxt;
570
571 spdk_json_write_object_begin(w);
572
573 spdk_json_write_named_string(w, "method", "construct_rbd_bdev");
574
575 spdk_json_write_named_object_begin(w, "params");
576 spdk_json_write_named_string(w, "name", bdev->name);
577 spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
578 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
579 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
580 spdk_json_write_object_end(w);
581
582 spdk_json_write_object_end(w);
583 }
584
585 static const struct spdk_bdev_fn_table rbd_fn_table = {
586 .destruct = bdev_rbd_destruct,
587 .submit_request = bdev_rbd_submit_request,
588 .io_type_supported = bdev_rbd_io_type_supported,
589 .get_io_channel = bdev_rbd_get_io_channel,
590 .dump_info_json = bdev_rbd_dump_info_json,
591 .write_config_json = bdev_rbd_write_config_json,
592 };
593
594 struct spdk_bdev *
595 spdk_bdev_rbd_create(const char *name, const char *pool_name, const char *rbd_name,
596 uint32_t block_size)
597 {
598 struct bdev_rbd *rbd;
599 int ret;
600
601 if ((pool_name == NULL) || (rbd_name == NULL)) {
602 return NULL;
603 }
604
605 rbd = calloc(1, sizeof(struct bdev_rbd));
606 if (rbd == NULL) {
607 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
608 return NULL;
609 }
610
611 rbd->rbd_name = strdup(rbd_name);
612 if (!rbd->rbd_name) {
613 bdev_rbd_free(rbd);
614 return NULL;
615 }
616
617 rbd->pool_name = strdup(pool_name);
618 if (!rbd->pool_name) {
619 bdev_rbd_free(rbd);
620 return NULL;
621 }
622
623 ret = bdev_rbd_init(rbd->pool_name, rbd_name, &rbd->info);
624 if (ret < 0) {
625 bdev_rbd_free(rbd);
626 SPDK_ERRLOG("Failed to init rbd device\n");
627 return NULL;
628 }
629
630 if (name) {
631 rbd->disk.name = strdup(name);
632 } else {
633 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
634 }
635 if (!rbd->disk.name) {
636 bdev_rbd_free(rbd);
637 return NULL;
638 }
639 rbd->disk.product_name = "Ceph Rbd Disk";
640 bdev_rbd_count++;
641
642 rbd->disk.write_cache = 0;
643 rbd->disk.blocklen = block_size;
644 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
645 rbd->disk.ctxt = rbd;
646 rbd->disk.fn_table = &rbd_fn_table;
647 rbd->disk.module = &rbd_if;
648
649 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
650
651 spdk_io_device_register(rbd, bdev_rbd_create_cb,
652 bdev_rbd_destroy_cb,
653 sizeof(struct bdev_rbd_io_channel),
654 rbd_name);
655 ret = spdk_bdev_register(&rbd->disk);
656 if (ret) {
657 spdk_io_device_unregister(rbd, NULL);
658 bdev_rbd_free(rbd);
659 return NULL;
660 }
661
662 return &rbd->disk;
663 }
664
665 void
666 spdk_bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
667 {
668 if (!bdev || bdev->module != &rbd_if) {
669 cb_fn(cb_arg, -ENODEV);
670 return;
671 }
672
673 spdk_bdev_unregister(bdev, cb_fn, cb_arg);
674 }
675
676 static int
677 bdev_rbd_library_init(void)
678 {
679 int i, rc = 0;
680 const char *val;
681 const char *pool_name;
682 const char *rbd_name;
683 uint32_t block_size;
684
685 struct spdk_conf_section *sp = spdk_conf_find_section(NULL, "Ceph");
686
687 if (sp == NULL) {
688 /*
689 * Ceph section not found. Do not initialize any rbd LUNS.
690 */
691 goto end;
692 }
693
694 /* Init rbd block devices */
695 for (i = 0; ; i++) {
696 val = spdk_conf_section_get_nval(sp, "Ceph", i);
697 if (val == NULL) {
698 break;
699 }
700
701 /* get the Rbd_pool name */
702 pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0);
703 if (pool_name == NULL) {
704 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i);
705 rc = -1;
706 goto end;
707 }
708
709 rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1);
710 if (rbd_name == NULL) {
711 SPDK_ERRLOG("Ceph%d: format error\n", i);
712 rc = -1;
713 goto end;
714 }
715
716 val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2);
717
718 if (val == NULL) {
719 block_size = 512; /* default value */
720 } else {
721 block_size = (int)strtol(val, NULL, 10);
722 if (block_size & 0x1ff) {
723 SPDK_ERRLOG("current block_size = %d, it should be multiple of 512\n",
724 block_size);
725 rc = -1;
726 goto end;
727 }
728 }
729
730 if (spdk_bdev_rbd_create(NULL, pool_name, rbd_name, block_size) == NULL) {
731 rc = -1;
732 goto end;
733 }
734 }
735
736 end:
737 return rc;
738 }
739
740 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD)