]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/bdev/rbd/bdev_rbd.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / bdev / rbd / bdev_rbd.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk/stdinc.h"
35
36 #include "bdev_rbd.h"
37
38 #include <rbd/librbd.h>
39 #include <rados/librados.h>
40 #include <sys/eventfd.h>
41
42 #include "spdk/conf.h"
43 #include "spdk/env.h"
44 #include "spdk/bdev.h"
45 #include "spdk/thread.h"
46 #include "spdk/json.h"
47 #include "spdk/string.h"
48 #include "spdk/util.h"
49
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
52
53 #define SPDK_RBD_QUEUE_DEPTH 128
54
55 static int bdev_rbd_count = 0;
56
57 #define BDEV_RBD_POLL_US 50
58
59 struct bdev_rbd {
60 struct spdk_bdev disk;
61 char *rbd_name;
62 char *user_id;
63 char *pool_name;
64 char **config;
65 rbd_image_info_t info;
66 TAILQ_ENTRY(bdev_rbd) tailq;
67 struct spdk_poller *reset_timer;
68 struct spdk_bdev_io *reset_bdev_io;
69 };
70
71 struct bdev_rbd_io_channel {
72 rados_ioctx_t io_ctx;
73 rados_t cluster;
74 struct pollfd pfd;
75 rbd_image_t image;
76 struct bdev_rbd *disk;
77 struct spdk_poller *poller;
78 };
79
80 struct bdev_rbd_io {
81 uint64_t remaining_len;
82 int num_segments;
83 bool failed;
84 };
85
86 static void
87 bdev_rbd_free(struct bdev_rbd *rbd)
88 {
89 if (!rbd) {
90 return;
91 }
92
93 free(rbd->disk.name);
94 free(rbd->rbd_name);
95 free(rbd->user_id);
96 free(rbd->pool_name);
97 spdk_bdev_rbd_free_config(rbd->config);
98 free(rbd);
99 }
100
101 void
102 spdk_bdev_rbd_free_config(char **config)
103 {
104 char **entry;
105
106 if (config) {
107 for (entry = config; *entry; entry++) {
108 free(*entry);
109 }
110 free(config);
111 }
112 }
113
114 char **
115 spdk_bdev_rbd_dup_config(const char *const *config)
116 {
117 size_t count;
118 char **copy;
119
120 if (!config) {
121 return NULL;
122 }
123 for (count = 0; config[count]; count++) {}
124 copy = calloc(count + 1, sizeof(*copy));
125 if (!copy) {
126 return NULL;
127 }
128 for (count = 0; config[count]; count++) {
129 if (!(copy[count] = strdup(config[count]))) {
130 spdk_bdev_rbd_free_config(copy);
131 return NULL;
132 }
133 }
134 return copy;
135 }
136
137 static int
138 bdev_rados_context_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
139 rados_t *cluster, rados_ioctx_t *io_ctx)
140 {
141 int ret;
142
143 ret = rados_create(cluster, user_id);
144 if (ret < 0) {
145 SPDK_ERRLOG("Failed to create rados_t struct\n");
146 return -1;
147 }
148
149 if (config) {
150 const char *const *entry = config;
151 while (*entry) {
152 ret = rados_conf_set(*cluster, entry[0], entry[1]);
153 if (ret < 0) {
154 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
155 rados_shutdown(*cluster);
156 return -1;
157 }
158 entry += 2;
159 }
160 } else {
161 ret = rados_conf_read_file(*cluster, NULL);
162 if (ret < 0) {
163 SPDK_ERRLOG("Failed to read conf file\n");
164 rados_shutdown(*cluster);
165 return -1;
166 }
167 }
168
169 ret = rados_connect(*cluster);
170 if (ret < 0) {
171 SPDK_ERRLOG("Failed to connect to rbd_pool\n");
172 rados_shutdown(*cluster);
173 return -1;
174 }
175
176 ret = rados_ioctx_create(*cluster, rbd_pool_name, io_ctx);
177
178 if (ret < 0) {
179 SPDK_ERRLOG("Failed to create ioctx\n");
180 rados_shutdown(*cluster);
181 return -1;
182 }
183
184 return 0;
185 }
186
187 static int
188 bdev_rbd_init(const char *user_id, const char *rbd_pool_name, const char *const *config,
189 const char *rbd_name, rbd_image_info_t *info)
190 {
191 int ret;
192 rados_t cluster = NULL;
193 rados_ioctx_t io_ctx = NULL;
194 rbd_image_t image = NULL;
195
196 ret = bdev_rados_context_init(user_id, rbd_pool_name, config, &cluster, &io_ctx);
197 if (ret < 0) {
198 SPDK_ERRLOG("Failed to create rados context for user_id=%s and rbd_pool=%s\n",
199 user_id ? user_id : "admin (the default)", rbd_pool_name);
200 return -1;
201 }
202
203 ret = rbd_open(io_ctx, rbd_name, &image, NULL);
204 if (ret < 0) {
205 SPDK_ERRLOG("Failed to open specified rbd device\n");
206 goto err;
207 }
208 ret = rbd_stat(image, info, sizeof(*info));
209 rbd_close(image);
210 if (ret < 0) {
211 SPDK_ERRLOG("Failed to stat specified rbd device\n");
212 goto err;
213 }
214
215 rados_ioctx_destroy(io_ctx);
216 return 0;
217 err:
218 rados_ioctx_destroy(io_ctx);
219 rados_shutdown(cluster);
220 return -1;
221 }
222
223 static void
224 bdev_rbd_exit(rbd_image_t image)
225 {
226 rbd_flush(image);
227 rbd_close(image);
228 }
229
230 static void
231 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
232 {
233 /* Doing nothing here */
234 }
235
236 static int
237 bdev_rbd_start_aio(rbd_image_t image, struct spdk_bdev_io *bdev_io,
238 void *buf, uint64_t offset, size_t len)
239 {
240 int ret;
241 rbd_completion_t comp;
242
243 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
244 &comp);
245 if (ret < 0) {
246 return -1;
247 }
248
249 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
250 ret = rbd_aio_read(image, offset, len,
251 buf, comp);
252 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
253 ret = rbd_aio_write(image, offset, len,
254 buf, comp);
255 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
256 ret = rbd_aio_flush(image, comp);
257 }
258
259 if (ret < 0) {
260 rbd_aio_release(comp);
261 return -1;
262 }
263
264 return 0;
265 }
266
267 static int bdev_rbd_library_init(void);
268
269 static int
270 bdev_rbd_get_ctx_size(void)
271 {
272 return sizeof(struct bdev_rbd_io);
273 }
274
275 static struct spdk_bdev_module rbd_if = {
276 .name = "rbd",
277 .module_init = bdev_rbd_library_init,
278 .get_ctx_size = bdev_rbd_get_ctx_size,
279
280 };
281 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
282
283 static int64_t
284 bdev_rbd_rw(struct bdev_rbd *disk, struct spdk_io_channel *ch,
285 struct spdk_bdev_io *bdev_io, struct iovec *iov,
286 int iovcnt, size_t len, uint64_t offset)
287 {
288 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
289 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
290 size_t remaining = len;
291 int i, rc;
292
293 rbd_io->remaining_len = 0;
294 rbd_io->num_segments = 0;
295 rbd_io->failed = false;
296
297 for (i = 0; i < iovcnt && remaining > 0; i++) {
298 size_t seg_len = spdk_min(remaining, iov[i].iov_len);
299
300 rc = bdev_rbd_start_aio(rbdio_ch->image, bdev_io, iov[i].iov_base, offset, seg_len);
301 if (rc) {
302 /*
303 * This bdev_rbd_start_aio() call failed, but if any previous ones were
304 * submitted, we need to wait for them to finish.
305 */
306 if (rbd_io->num_segments == 0) {
307 /* No previous I/O submitted - return error code immediately. */
308 return rc;
309 }
310
311 /* Return and wait for outstanding I/O to complete. */
312 rbd_io->failed = true;
313 return 0;
314 }
315
316 rbd_io->num_segments++;
317 rbd_io->remaining_len += seg_len;
318
319 offset += seg_len;
320 remaining -= seg_len;
321 }
322
323 return 0;
324 }
325
326 static int64_t
327 bdev_rbd_flush(struct bdev_rbd *disk, struct spdk_io_channel *ch,
328 struct spdk_bdev_io *bdev_io, uint64_t offset, uint64_t nbytes)
329 {
330 struct bdev_rbd_io_channel *rbdio_ch = spdk_io_channel_get_ctx(ch);
331
332 return bdev_rbd_start_aio(rbdio_ch->image, bdev_io, NULL, offset, nbytes);
333 }
334
335 static int
336 bdev_rbd_reset_timer(void *arg)
337 {
338 struct bdev_rbd *disk = arg;
339
340 /*
341 * TODO: This should check if any I/O is still in flight before completing the reset.
342 * For now, just complete after the timer expires.
343 */
344 spdk_bdev_io_complete(disk->reset_bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
345 spdk_poller_unregister(&disk->reset_timer);
346 disk->reset_bdev_io = NULL;
347
348 return -1;
349 }
350
351 static int
352 bdev_rbd_reset(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io)
353 {
354 /*
355 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
356 * timer to wait for in-flight I/O to complete.
357 */
358 assert(disk->reset_bdev_io == NULL);
359 disk->reset_bdev_io = bdev_io;
360 disk->reset_timer = spdk_poller_register(bdev_rbd_reset_timer, disk, 1 * 1000 * 1000);
361
362 return 0;
363 }
364
365 static int
366 bdev_rbd_destruct(void *ctx)
367 {
368 struct bdev_rbd *rbd = ctx;
369
370 spdk_io_device_unregister(rbd, NULL);
371
372 bdev_rbd_free(rbd);
373 return 0;
374 }
375
376 static void
377 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
378 bool success)
379 {
380 int ret;
381
382 if (!success) {
383 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
384 return;
385 }
386
387 ret = bdev_rbd_rw(bdev_io->bdev->ctxt,
388 ch,
389 bdev_io,
390 bdev_io->u.bdev.iovs,
391 bdev_io->u.bdev.iovcnt,
392 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
393 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
394
395 if (ret != 0) {
396 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
397 }
398 }
399
400 static int _bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
401 {
402 switch (bdev_io->type) {
403 case SPDK_BDEV_IO_TYPE_READ:
404 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
405 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
406 return 0;
407
408 case SPDK_BDEV_IO_TYPE_WRITE:
409 return bdev_rbd_rw((struct bdev_rbd *)bdev_io->bdev->ctxt,
410 ch,
411 bdev_io,
412 bdev_io->u.bdev.iovs,
413 bdev_io->u.bdev.iovcnt,
414 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
415 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
416
417 case SPDK_BDEV_IO_TYPE_FLUSH:
418 return bdev_rbd_flush((struct bdev_rbd *)bdev_io->bdev->ctxt,
419 ch,
420 bdev_io,
421 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
422 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
423
424 case SPDK_BDEV_IO_TYPE_RESET:
425 return bdev_rbd_reset((struct bdev_rbd *)bdev_io->bdev->ctxt,
426 bdev_io);
427
428 default:
429 return -1;
430 }
431 return 0;
432 }
433
434 static void bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
435 {
436 if (_bdev_rbd_submit_request(ch, bdev_io) < 0) {
437 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
438 }
439 }
440
441 static bool
442 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
443 {
444 switch (io_type) {
445 case SPDK_BDEV_IO_TYPE_READ:
446 case SPDK_BDEV_IO_TYPE_WRITE:
447 case SPDK_BDEV_IO_TYPE_FLUSH:
448 case SPDK_BDEV_IO_TYPE_RESET:
449 return true;
450
451 default:
452 return false;
453 }
454 }
455
456 static int
457 bdev_rbd_io_poll(void *arg)
458 {
459 struct bdev_rbd_io_channel *ch = arg;
460 int i, io_status, rc;
461 rbd_completion_t comps[SPDK_RBD_QUEUE_DEPTH];
462 struct spdk_bdev_io *bdev_io;
463 struct bdev_rbd_io *rbd_io;
464
465 rc = poll(&ch->pfd, 1, 0);
466
467 /* check the return value of poll since we have only one fd for each channel */
468 if (rc != 1) {
469 return 0;
470 }
471
472 rc = rbd_poll_io_events(ch->image, comps, SPDK_RBD_QUEUE_DEPTH);
473 for (i = 0; i < rc; i++) {
474 bdev_io = rbd_aio_get_arg(comps[i]);
475 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
476 io_status = rbd_aio_get_return_value(comps[i]);
477
478 assert(rbd_io->num_segments > 0);
479 rbd_io->num_segments--;
480
481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
482 if (io_status > 0) {
483 /* For reads, io_status is the length */
484 rbd_io->remaining_len -= io_status;
485 }
486
487 if (rbd_io->num_segments == 0 && rbd_io->remaining_len != 0) {
488 rbd_io->failed = true;
489 }
490 } else {
491 /* For others, 0 means success */
492 if (io_status != 0) {
493 rbd_io->failed = true;
494 }
495 }
496
497 rbd_aio_release(comps[i]);
498
499 if (rbd_io->num_segments == 0) {
500 spdk_bdev_io_complete(bdev_io,
501 rbd_io->failed ? SPDK_BDEV_IO_STATUS_FAILED : SPDK_BDEV_IO_STATUS_SUCCESS);
502 }
503 }
504
505 return rc;
506 }
507
508 static void
509 bdev_rbd_free_channel(struct bdev_rbd_io_channel *ch)
510 {
511 if (!ch) {
512 return;
513 }
514
515 if (ch->image) {
516 bdev_rbd_exit(ch->image);
517 }
518
519 if (ch->io_ctx) {
520 rados_ioctx_destroy(ch->io_ctx);
521 }
522
523 if (ch->cluster) {
524 rados_shutdown(ch->cluster);
525 }
526
527 if (ch->pfd.fd >= 0) {
528 close(ch->pfd.fd);
529 }
530 }
531
532 static void *
533 bdev_rbd_handle(void *arg)
534 {
535 struct bdev_rbd_io_channel *ch = arg;
536 void *ret = arg;
537
538 if (rbd_open(ch->io_ctx, ch->disk->rbd_name, &ch->image, NULL) < 0) {
539 SPDK_ERRLOG("Failed to open specified rbd device\n");
540 ret = NULL;
541 }
542
543 return ret;
544 }
545
546 static int
547 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
548 {
549 struct bdev_rbd_io_channel *ch = ctx_buf;
550 int ret;
551
552 ch->disk = io_device;
553 ch->image = NULL;
554 ch->io_ctx = NULL;
555 ch->pfd.fd = -1;
556
557 ret = bdev_rados_context_init(ch->disk->user_id, ch->disk->pool_name,
558 (const char *const *)ch->disk->config,
559 &ch->cluster, &ch->io_ctx);
560 if (ret < 0) {
561 SPDK_ERRLOG("Failed to create rados context for user_id %s and rbd_pool=%s\n",
562 ch->disk->user_id ? ch->disk->user_id : "admin (the default)", ch->disk->pool_name);
563 goto err;
564 }
565
566 if (spdk_call_unaffinitized(bdev_rbd_handle, ch) == NULL) {
567 goto err;
568 }
569
570 ch->pfd.fd = eventfd(0, EFD_NONBLOCK);
571 if (ch->pfd.fd < 0) {
572 SPDK_ERRLOG("Failed to get eventfd\n");
573 goto err;
574 }
575
576 ch->pfd.events = POLLIN;
577 ret = rbd_set_image_notification(ch->image, ch->pfd.fd, EVENT_TYPE_EVENTFD);
578 if (ret < 0) {
579 SPDK_ERRLOG("Failed to set rbd image notification\n");
580 goto err;
581 }
582
583 ch->poller = spdk_poller_register(bdev_rbd_io_poll, ch, BDEV_RBD_POLL_US);
584
585 return 0;
586
587 err:
588 bdev_rbd_free_channel(ch);
589 return -1;
590 }
591
592 static void
593 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
594 {
595 struct bdev_rbd_io_channel *io_channel = ctx_buf;
596
597 bdev_rbd_free_channel(io_channel);
598
599 spdk_poller_unregister(&io_channel->poller);
600 }
601
602 static struct spdk_io_channel *
603 bdev_rbd_get_io_channel(void *ctx)
604 {
605 struct bdev_rbd *rbd_bdev = ctx;
606
607 return spdk_get_io_channel(rbd_bdev);
608 }
609
610 static int
611 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
612 {
613 struct bdev_rbd *rbd_bdev = ctx;
614
615 spdk_json_write_named_object_begin(w, "rbd");
616
617 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
618
619 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
620
621 if (rbd_bdev->user_id) {
622 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
623 }
624
625 if (rbd_bdev->config) {
626 char **entry = rbd_bdev->config;
627
628 spdk_json_write_named_object_begin(w, "config");
629 while (*entry) {
630 spdk_json_write_named_string(w, entry[0], entry[1]);
631 entry += 2;
632 }
633 spdk_json_write_object_end(w);
634 }
635
636 spdk_json_write_object_end(w);
637
638 return 0;
639 }
640
641 static void
642 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
643 {
644 struct bdev_rbd *rbd = bdev->ctxt;
645
646 spdk_json_write_object_begin(w);
647
648 spdk_json_write_named_string(w, "method", "construct_rbd_bdev");
649
650 spdk_json_write_named_object_begin(w, "params");
651 spdk_json_write_named_string(w, "name", bdev->name);
652 spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
653 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
654 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
655 if (rbd->user_id) {
656 spdk_json_write_named_string(w, "user_id", rbd->user_id);
657 }
658
659 if (rbd->config) {
660 char **entry = rbd->config;
661
662 spdk_json_write_named_object_begin(w, "config");
663 while (*entry) {
664 spdk_json_write_named_string(w, entry[0], entry[1]);
665 entry += 2;
666 }
667 spdk_json_write_object_end(w);
668 }
669
670 spdk_json_write_object_end(w);
671
672 spdk_json_write_object_end(w);
673 }
674
675 static const struct spdk_bdev_fn_table rbd_fn_table = {
676 .destruct = bdev_rbd_destruct,
677 .submit_request = bdev_rbd_submit_request,
678 .io_type_supported = bdev_rbd_io_type_supported,
679 .get_io_channel = bdev_rbd_get_io_channel,
680 .dump_info_json = bdev_rbd_dump_info_json,
681 .write_config_json = bdev_rbd_write_config_json,
682 };
683
684 struct spdk_bdev *
685 spdk_bdev_rbd_create(const char *name, const char *user_id, const char *pool_name,
686 const char *const *config,
687 const char *rbd_name,
688 uint32_t block_size)
689 {
690 struct bdev_rbd *rbd;
691 int ret;
692
693 if ((pool_name == NULL) || (rbd_name == NULL)) {
694 return NULL;
695 }
696
697 rbd = calloc(1, sizeof(struct bdev_rbd));
698 if (rbd == NULL) {
699 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
700 return NULL;
701 }
702
703 rbd->rbd_name = strdup(rbd_name);
704 if (!rbd->rbd_name) {
705 bdev_rbd_free(rbd);
706 return NULL;
707 }
708
709 if (user_id) {
710 rbd->user_id = strdup(user_id);
711 if (!rbd->user_id) {
712 bdev_rbd_free(rbd);
713 return NULL;
714 }
715 }
716
717 rbd->pool_name = strdup(pool_name);
718 if (!rbd->pool_name) {
719 bdev_rbd_free(rbd);
720 return NULL;
721 }
722
723 if (config && !(rbd->config = spdk_bdev_rbd_dup_config(config))) {
724 bdev_rbd_free(rbd);
725 return NULL;
726 }
727
728 ret = bdev_rbd_init(rbd->user_id, rbd->pool_name,
729 (const char *const *)rbd->config,
730 rbd_name, &rbd->info);
731 if (ret < 0) {
732 bdev_rbd_free(rbd);
733 SPDK_ERRLOG("Failed to init rbd device\n");
734 return NULL;
735 }
736
737 if (name) {
738 rbd->disk.name = strdup(name);
739 } else {
740 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
741 }
742 if (!rbd->disk.name) {
743 bdev_rbd_free(rbd);
744 return NULL;
745 }
746 rbd->disk.product_name = "Ceph Rbd Disk";
747 bdev_rbd_count++;
748
749 rbd->disk.write_cache = 0;
750 rbd->disk.blocklen = block_size;
751 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
752 rbd->disk.ctxt = rbd;
753 rbd->disk.fn_table = &rbd_fn_table;
754 rbd->disk.module = &rbd_if;
755
756 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
757
758 spdk_io_device_register(rbd, bdev_rbd_create_cb,
759 bdev_rbd_destroy_cb,
760 sizeof(struct bdev_rbd_io_channel),
761 rbd_name);
762 ret = spdk_bdev_register(&rbd->disk);
763 if (ret) {
764 spdk_io_device_unregister(rbd, NULL);
765 bdev_rbd_free(rbd);
766 return NULL;
767 }
768
769 return &rbd->disk;
770 }
771
772 void
773 spdk_bdev_rbd_delete(struct spdk_bdev *bdev, spdk_delete_rbd_complete cb_fn, void *cb_arg)
774 {
775 if (!bdev || bdev->module != &rbd_if) {
776 cb_fn(cb_arg, -ENODEV);
777 return;
778 }
779
780 spdk_bdev_unregister(bdev, cb_fn, cb_arg);
781 }
782
783 static int
784 bdev_rbd_library_init(void)
785 {
786 int i, rc = 0;
787 const char *val;
788 const char *pool_name;
789 const char *rbd_name;
790 uint32_t block_size;
791 long int tmp;
792
793 struct spdk_conf_section *sp = spdk_conf_find_section(NULL, "Ceph");
794
795 if (sp == NULL) {
796 /*
797 * Ceph section not found. Do not initialize any rbd LUNS.
798 */
799 goto end;
800 }
801
802 /* Init rbd block devices */
803 for (i = 0; ; i++) {
804 val = spdk_conf_section_get_nval(sp, "Ceph", i);
805 if (val == NULL) {
806 break;
807 }
808
809 /* get the Rbd_pool name */
810 pool_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 0);
811 if (pool_name == NULL) {
812 SPDK_ERRLOG("Ceph%d: rbd pool name needs to be provided\n", i);
813 rc = -1;
814 goto end;
815 }
816
817 rbd_name = spdk_conf_section_get_nmval(sp, "Ceph", i, 1);
818 if (rbd_name == NULL) {
819 SPDK_ERRLOG("Ceph%d: format error\n", i);
820 rc = -1;
821 goto end;
822 }
823
824 val = spdk_conf_section_get_nmval(sp, "Ceph", i, 2);
825
826 if (val == NULL) {
827 block_size = 512; /* default value */
828 } else {
829 tmp = spdk_strtol(val, 10);
830 if (tmp <= 0) {
831 SPDK_ERRLOG("Invalid block size\n");
832 rc = -1;
833 goto end;
834 } else if (tmp & 0x1ff) {
835 SPDK_ERRLOG("current block_size = %ld, it should be multiple of 512\n",
836 tmp);
837 rc = -1;
838 goto end;
839 }
840 block_size = (uint32_t)tmp;
841 }
842
843 /* TODO(?): user_id and rbd config values */
844 if (spdk_bdev_rbd_create(NULL, NULL, pool_name, NULL, rbd_name, block_size) == NULL) {
845 rc = -1;
846 goto end;
847 }
848 }
849
850 end:
851 return rc;
852 }
853
854 SPDK_LOG_REGISTER_COMPONENT("bdev_rbd", SPDK_LOG_BDEV_RBD)