]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/image/CreateRequest.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / librbd / image / CreateRequest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/image/CreateRequest.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "cls/rbd/cls_rbd_client.h"
8 #include "include/assert.h"
9 #include "librbd/Utils.h"
10 #include "common/ceph_context.h"
11 #include "librbd/Journal.h"
12 #include "librbd/MirroringWatcher.h"
13 #include "librbd/journal/CreateRequest.h"
14 #include "librbd/journal/RemoveRequest.h"
15 #include "librbd/mirror/EnableRequest.h"
16 #include "librbd/io/AioCompletion.h"
17 #include "journal/Journaler.h"
18
19 #define dout_subsys ceph_subsys_rbd
20 #undef dout_prefix
21 #define dout_prefix *_dout << "librbd::image::CreateRequest: " << __func__ \
22 << ": "
23
24 namespace librbd {
25 namespace image {
26
27 using util::create_rados_callback;
28 using util::create_context_callback;
29
30 namespace {
31
32 int validate_features(CephContext *cct, uint64_t features,
33 bool force_non_primary) {
34 if (features & ~RBD_FEATURES_ALL) {
35 lderr(cct) << "librbd does not support requested features." << dendl;
36 return -ENOSYS;
37 }
38 if ((features & RBD_FEATURE_FAST_DIFF) != 0 &&
39 (features & RBD_FEATURE_OBJECT_MAP) == 0) {
40 lderr(cct) << "cannot use fast diff without object map" << dendl;
41 return -EINVAL;
42 }
43 if ((features & RBD_FEATURE_OBJECT_MAP) != 0 &&
44 (features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
45 lderr(cct) << "cannot use object map without exclusive lock" << dendl;
46 return -EINVAL;
47 }
48 if ((features & RBD_FEATURE_JOURNALING) != 0) {
49 if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
50 lderr(cct) << "cannot use journaling without exclusive lock" << dendl;
51 return -EINVAL;
52 }
53 } else if (force_non_primary) {
54 assert(false);
55 }
56
57 return 0;
58 }
59
60 int validate_striping(CephContext *cct, uint8_t order, uint64_t stripe_unit,
61 uint64_t stripe_count) {
62 if ((stripe_unit && !stripe_count) ||
63 (!stripe_unit && stripe_count)) {
64 lderr(cct) << "must specify both (or neither) of stripe-unit and "
65 << "stripe-count" << dendl;
66 return -EINVAL;
67 } else if (stripe_unit || stripe_count) {
68 if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
69 lderr(cct) << "stripe unit is not a factor of the object size" << dendl;
70 return -EINVAL;
71 }
72 }
73 return 0;
74 }
75
76 int validate_data_pool(CephContext *cct, IoCtx &io_ctx, uint64_t features,
77 const std::string &data_pool, int64_t *data_pool_id) {
78 if ((features & RBD_FEATURE_DATA_POOL) == 0) {
79 return 0;
80 }
81
82 librados::Rados rados(io_ctx);
83 librados::IoCtx data_io_ctx;
84 int r = rados.ioctx_create(data_pool.c_str(), data_io_ctx);
85 if (r < 0) {
86 lderr(cct) << "data pool " << data_pool << " does not exist" << dendl;
87 return -ENOENT;
88 }
89
90 *data_pool_id = data_io_ctx.get_id();
91 return 0;
92 }
93
94
95 bool validate_layout(CephContext *cct, uint64_t size, file_layout_t &layout) {
96 if (!librbd::ObjectMap<>::is_compatible(layout, size)) {
97 lderr(cct) << "image size not compatible with object map" << dendl;
98 return false;
99 }
100
101 return true;
102 }
103
104 int get_image_option(const ImageOptions &image_options, int option,
105 uint8_t *value) {
106 uint64_t large_value;
107 int r = image_options.get(option, &large_value);
108 if (r < 0) {
109 return r;
110 }
111 *value = static_cast<uint8_t>(large_value);
112 return 0;
113 }
114
115 } // anonymous namespace
116
117 template<typename I>
118 int CreateRequest<I>::validate_order(CephContext *cct, uint8_t order) {
119 if (order > 25 || order < 12) {
120 lderr(cct) << "order must be in the range [12, 25]" << dendl;
121 return -EDOM;
122 }
123 return 0;
124 }
125
126 #undef dout_prefix
127 #define dout_prefix *_dout << "librbd::image::CreateRequest: " << this << " " \
128 << __func__ << ": "
129
130 template<typename I>
131 CreateRequest<I>::CreateRequest(IoCtx &ioctx, const std::string &image_name,
132 const std::string &image_id, uint64_t size,
133 const ImageOptions &image_options,
134 const std::string &non_primary_global_image_id,
135 const std::string &primary_mirror_uuid,
136 bool skip_mirror_enable,
137 ContextWQ *op_work_queue, Context *on_finish)
138 : m_ioctx(ioctx), m_image_name(image_name), m_image_id(image_id),
139 m_size(size), m_non_primary_global_image_id(non_primary_global_image_id),
140 m_primary_mirror_uuid(primary_mirror_uuid),
141 m_skip_mirror_enable(skip_mirror_enable),
142 m_op_work_queue(op_work_queue), m_on_finish(on_finish) {
143 m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
144
145 m_id_obj = util::id_obj_name(m_image_name);
146 m_header_obj = util::header_name(m_image_id);
147 m_objmap_name = ObjectMap<>::object_map_name(m_image_id, CEPH_NOSNAP);
148
149 if (image_options.get(RBD_IMAGE_OPTION_FEATURES, &m_features) != 0) {
150 m_features = util::get_rbd_default_features(m_cct);
151 m_negotiate_features = true;
152 }
153
154 uint64_t features_clear = 0;
155 uint64_t features_set = 0;
156 image_options.get(RBD_IMAGE_OPTION_FEATURES_CLEAR, &features_clear);
157 image_options.get(RBD_IMAGE_OPTION_FEATURES_SET, &features_set);
158
159 uint64_t features_conflict = features_clear & features_set;
160 features_clear &= ~features_conflict;
161 features_set &= ~features_conflict;
162 m_features |= features_set;
163 m_features &= ~features_clear;
164
165 if (image_options.get(RBD_IMAGE_OPTION_STRIPE_UNIT, &m_stripe_unit) != 0 ||
166 m_stripe_unit == 0) {
167 m_stripe_unit = m_cct->_conf->rbd_default_stripe_unit;
168 }
169 if (image_options.get(RBD_IMAGE_OPTION_STRIPE_COUNT, &m_stripe_count) != 0 ||
170 m_stripe_count == 0) {
171 m_stripe_count = m_cct->_conf->rbd_default_stripe_count;
172 }
173 if (get_image_option(image_options, RBD_IMAGE_OPTION_ORDER, &m_order) != 0 ||
174 m_order == 0) {
175 m_order = m_cct->_conf->rbd_default_order;
176 }
177 if (get_image_option(image_options, RBD_IMAGE_OPTION_JOURNAL_ORDER,
178 &m_journal_order) != 0) {
179 m_journal_order = m_cct->_conf->rbd_journal_order;
180 }
181 if (get_image_option(image_options, RBD_IMAGE_OPTION_JOURNAL_SPLAY_WIDTH,
182 &m_journal_splay_width) != 0) {
183 m_journal_splay_width = m_cct->_conf->rbd_journal_splay_width;
184 }
185 if (image_options.get(RBD_IMAGE_OPTION_JOURNAL_POOL, &m_journal_pool) != 0) {
186 m_journal_pool = m_cct->_conf->rbd_journal_pool;
187 }
188 if (image_options.get(RBD_IMAGE_OPTION_DATA_POOL, &m_data_pool) != 0) {
189 m_data_pool = m_cct->_conf->rbd_default_data_pool;
190 }
191
192 m_layout.object_size = 1ull << m_order;
193 if (m_stripe_unit == 0 || m_stripe_count == 0) {
194 m_layout.stripe_unit = m_layout.object_size;
195 m_layout.stripe_count = 1;
196 } else {
197 m_layout.stripe_unit = m_stripe_unit;
198 m_layout.stripe_count = m_stripe_count;
199 }
200
201 m_force_non_primary = !non_primary_global_image_id.empty();
202
203 if (!m_data_pool.empty() && m_data_pool != ioctx.get_pool_name()) {
204 m_features |= RBD_FEATURE_DATA_POOL;
205 } else {
206 m_data_pool.clear();
207 m_features &= ~RBD_FEATURE_DATA_POOL;
208 }
209
210 if ((m_stripe_unit != 0 && m_stripe_unit != (1ULL << m_order)) ||
211 (m_stripe_count != 0 && m_stripe_count != 1)) {
212 m_features |= RBD_FEATURE_STRIPINGV2;
213 } else {
214 m_features &= ~RBD_FEATURE_STRIPINGV2;
215 }
216
217 ldout(m_cct, 20) << "name=" << m_image_name << ", "
218 << "id=" << m_image_id << ", "
219 << "size=" << m_size << ", "
220 << "features=" << m_features << ", "
221 << "order=" << (uint64_t)m_order << ", "
222 << "stripe_unit=" << m_stripe_unit << ", "
223 << "stripe_count=" << m_stripe_count << ", "
224 << "journal_order=" << (uint64_t)m_journal_order << ", "
225 << "journal_splay_width="
226 << (uint64_t)m_journal_splay_width << ", "
227 << "journal_pool=" << m_journal_pool << ", "
228 << "data_pool=" << m_data_pool << dendl;
229 }
230
231 template<typename I>
232 void CreateRequest<I>::send() {
233 ldout(m_cct, 20) << dendl;
234
235 int r = validate_features(m_cct, m_features, m_force_non_primary);
236 if (r < 0) {
237 complete(r);
238 return;
239 }
240
241 r = validate_order(m_cct, m_order);
242 if (r < 0) {
243 complete(r);
244 return;
245 }
246
247 r = validate_striping(m_cct, m_order, m_stripe_unit, m_stripe_count);
248 if (r < 0) {
249 complete(r);
250 return;
251 }
252
253 r = validate_data_pool(m_cct, m_ioctx, m_features, m_data_pool,
254 &m_data_pool_id);
255 if (r < 0) {
256 complete(r);
257 return;
258 }
259
260 if (!validate_layout(m_cct, m_size, m_layout)) {
261 complete(-EINVAL);
262 return;
263 }
264
265 validate_pool();
266 }
267
268 template<typename I>
269 void CreateRequest<I>::validate_pool() {
270 if (!m_cct->_conf->rbd_validate_pool) {
271 create_id_object();
272 return;
273 }
274
275 ldout(m_cct, 20) << dendl;
276
277 using klass = CreateRequest<I>;
278 librados::AioCompletion *comp =
279 create_rados_callback<klass, &klass::handle_validate_pool>(this);
280
281 librados::ObjectReadOperation op;
282 op.stat(NULL, NULL, NULL);
283
284 m_outbl.clear();
285 int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op, &m_outbl);
286 assert(r == 0);
287 comp->release();
288 }
289
290 template<typename I>
291 void CreateRequest<I>::handle_validate_pool(int r) {
292 ldout(m_cct, 20) << "r=" << r << dendl;
293
294 if (r == 0) {
295 validate_overwrite();
296 return;
297 } else if ((r < 0) && (r != -ENOENT)) {
298 lderr(m_cct) << "failed to stat RBD directory: " << cpp_strerror(r)
299 << dendl;
300 complete(r);
301 return;
302 }
303
304 // allocate a self-managed snapshot id if this a new pool to force
305 // self-managed snapshot mode
306 // This call is executed just once per (fresh) pool, hence we do not
307 // try hard to make it asynchronous (and it's pretty safe not to cause
308 // deadlocks).
309
310 uint64_t snap_id;
311 r = m_ioctx.selfmanaged_snap_create(&snap_id);
312 if (r == -EINVAL) {
313 lderr(m_cct) << "pool not configured for self-managed RBD snapshot support"
314 << dendl;
315 complete(r);
316 return;
317 } else if (r < 0) {
318 lderr(m_cct) << "failed to allocate self-managed snapshot: "
319 << cpp_strerror(r) << dendl;
320 complete(r);
321 return;
322 }
323
324 r = m_ioctx.selfmanaged_snap_remove(snap_id);
325 if (r < 0) {
326 // we've already switched to self-managed snapshots -- no need to
327 // error out in case of failure here.
328 ldout(m_cct, 10) << "failed to release self-managed snapshot " << snap_id
329 << ": " << cpp_strerror(r) << dendl;
330 }
331
332 validate_overwrite();
333 }
334
335 template <typename I>
336 void CreateRequest<I>::validate_overwrite() {
337 ldout(m_cct, 20) << dendl;
338
339 m_data_io_ctx = m_ioctx;
340 if (m_data_pool_id != -1) {
341 librados::Rados rados(m_ioctx);
342 int r = rados.ioctx_create2(m_data_pool_id, m_data_io_ctx);
343 if (r < 0) {
344 lderr(m_cct) << "data pool " << m_data_pool << " does not exist" << dendl;
345 complete(r);
346 return;
347 }
348 }
349
350 using klass = CreateRequest<I>;
351 librados::AioCompletion *comp =
352 create_rados_callback<klass, &klass::handle_validate_overwrite>(this);
353
354 librados::ObjectReadOperation op;
355 op.read(0, 0, nullptr, nullptr);
356
357 m_outbl.clear();
358 int r = m_data_io_ctx.aio_operate(RBD_INFO, comp, &op, &m_outbl);
359 assert(r == 0);
360 comp->release();
361 }
362
363 template <typename I>
364 void CreateRequest<I>::handle_validate_overwrite(int r) {
365 ldout(m_cct, 20) << "r=" << r << dendl;
366
367 bufferlist bl;
368 bl.append("overwrite validated");
369
370 if (r == 0 && m_outbl.contents_equal(bl)) {
371 create_id_object();
372 return;
373 } else if ((r < 0) && (r != -ENOENT)) {
374 lderr(m_cct) << "failed to read RBD info: " << cpp_strerror(r) << dendl;
375 complete(r);
376 return;
377 }
378
379 // validate the pool supports overwrites. We cannot use rbd_directory
380 // since the v1 images store the directory as tmap data within the object.
381 ldout(m_cct, 10) << "validating overwrite support" << dendl;
382 bufferlist initial_bl;
383 initial_bl.append("validate");
384 r = m_data_io_ctx.write(RBD_INFO, initial_bl, initial_bl.length(), 0);
385 if (r >= 0) {
386 r = m_data_io_ctx.write(RBD_INFO, bl, bl.length(), 0);
387 }
388 if (r == -EOPNOTSUPP) {
389 lderr(m_cct) << "pool missing required overwrite support" << dendl;
390 complete(-EINVAL);
391 return;
392 } else if (r < 0) {
393 lderr(m_cct) << "failed to validate overwrite support: " << cpp_strerror(r)
394 << dendl;
395 complete(r);
396 return;
397 }
398
399 create_id_object();
400 }
401
402 template<typename I>
403 void CreateRequest<I>::create_id_object() {
404 ldout(m_cct, 20) << dendl;
405
406 librados::ObjectWriteOperation op;
407 op.create(true);
408 cls_client::set_id(&op, m_image_id);
409
410 using klass = CreateRequest<I>;
411 librados::AioCompletion *comp =
412 create_rados_callback<klass, &klass::handle_create_id_object>(this);
413 int r = m_ioctx.aio_operate(m_id_obj, comp, &op);
414 assert(r == 0);
415 comp->release();
416 }
417
418 template<typename I>
419 void CreateRequest<I>::handle_create_id_object(int r) {
420 ldout(m_cct, 20) << "r=" << r << dendl;
421
422 if (r < 0) {
423 lderr(m_cct) << "error creating RBD id object: " << cpp_strerror(r)
424 << dendl;
425 complete(r);
426 return;
427 }
428
429 add_image_to_directory();
430 }
431
432 template<typename I>
433 void CreateRequest<I>::add_image_to_directory() {
434 ldout(m_cct, 20) << dendl;
435
436 librados::ObjectWriteOperation op;
437 cls_client::dir_add_image(&op, m_image_name, m_image_id);
438
439 using klass = CreateRequest<I>;
440 librados::AioCompletion *comp =
441 create_rados_callback<klass, &klass::handle_add_image_to_directory>(this);
442 int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op);
443 assert(r == 0);
444 comp->release();
445 }
446
447 template<typename I>
448 void CreateRequest<I>::handle_add_image_to_directory(int r) {
449 ldout(m_cct, 20) << "r=" << r << dendl;
450
451 if (r < 0) {
452 lderr(m_cct) << "error adding image to directory: " << cpp_strerror(r)
453 << dendl;
454
455 m_r_saved = r;
456 remove_id_object();
457 }
458
459 negotiate_features();
460 }
461
462 template<typename I>
463 void CreateRequest<I>::negotiate_features() {
464 if (!m_negotiate_features) {
465 create_image();
466 return;
467 }
468
469 ldout(m_cct, 20) << dendl;
470
471 librados::ObjectReadOperation op;
472 cls_client::get_all_features_start(&op);
473
474 using klass = CreateRequest<I>;
475 librados::AioCompletion *comp =
476 create_rados_callback<klass, &klass::handle_negotiate_features>(this);
477
478 m_outbl.clear();
479 int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op, &m_outbl);
480 assert(r == 0);
481 comp->release();
482 }
483
484 template<typename I>
485 void CreateRequest<I>::handle_negotiate_features(int r) {
486 ldout(m_cct, 20) << "r=" << r << dendl;
487
488 uint64_t all_features;
489 if (r >= 0) {
490 bufferlist::iterator it = m_outbl.begin();
491 r = cls_client::get_all_features_finish(&it, &all_features);
492 }
493 if (r < 0) {
494 ldout(m_cct, 10) << "error retrieving server supported features set: "
495 << cpp_strerror(r) << dendl;
496 } else if ((m_features & all_features) != m_features) {
497 m_features &= all_features;
498 ldout(m_cct, 10) << "limiting default features set to server supported: "
499 << m_features << dendl;
500 }
501
502 create_image();
503 }
504
505 template<typename I>
506 void CreateRequest<I>::create_image() {
507 ldout(m_cct, 20) << dendl;
508 assert(m_data_pool.empty() || m_data_pool_id != -1);
509
510 ostringstream oss;
511 oss << RBD_DATA_PREFIX;
512 if (m_data_pool_id != -1) {
513 oss << stringify(m_ioctx.get_id()) << ".";
514 }
515 oss << m_image_id;
516 if (oss.str().length() > RBD_MAX_BLOCK_NAME_PREFIX_LENGTH) {
517 lderr(m_cct) << "object prefix '" << oss.str() << "' too large" << dendl;
518 complete(-EINVAL);
519 return;
520 }
521
522 librados::ObjectWriteOperation op;
523 op.create(true);
524 cls_client::create_image(&op, m_size, m_order, m_features, oss.str(),
525 m_data_pool_id);
526
527 using klass = CreateRequest<I>;
528 librados::AioCompletion *comp =
529 create_rados_callback<klass, &klass::handle_create_image>(this);
530 int r = m_ioctx.aio_operate(m_header_obj, comp, &op);
531 assert(r == 0);
532 comp->release();
533 }
534
535 template<typename I>
536 void CreateRequest<I>::handle_create_image(int r) {
537 ldout(m_cct, 20) << "r=" << r << dendl;
538
539 if (r < 0) {
540 lderr(m_cct) << "error writing header: " << cpp_strerror(r) << dendl;
541 m_r_saved = r;
542 remove_from_dir();
543 return;
544 }
545
546 set_stripe_unit_count();
547 }
548
549 template<typename I>
550 void CreateRequest<I>::set_stripe_unit_count() {
551 if ((!m_stripe_unit && !m_stripe_count) ||
552 ((m_stripe_count == 1) && (m_stripe_unit == (1ull << m_order)))) {
553 object_map_resize();
554 return;
555 }
556
557 ldout(m_cct, 20) << dendl;
558
559 librados::ObjectWriteOperation op;
560 cls_client::set_stripe_unit_count(&op, m_stripe_unit, m_stripe_count);
561
562 using klass = CreateRequest<I>;
563 librados::AioCompletion *comp =
564 create_rados_callback<klass, &klass::handle_set_stripe_unit_count>(this);
565 int r = m_ioctx.aio_operate(m_header_obj, comp, &op);
566 assert(r == 0);
567 comp->release();
568 }
569
570 template<typename I>
571 void CreateRequest<I>::handle_set_stripe_unit_count(int r) {
572 ldout(m_cct, 20) << "r=" << r << dendl;
573
574 if (r < 0) {
575 lderr(m_cct) << "error setting stripe unit/count: "
576 << cpp_strerror(r) << dendl;
577 m_r_saved = r;
578 remove_header_object();
579 return;
580 }
581
582 object_map_resize();
583 }
584
585 template<typename I>
586 void CreateRequest<I>::object_map_resize() {
587 if ((m_features & RBD_FEATURE_OBJECT_MAP) == 0) {
588 fetch_mirror_mode();
589 return;
590 }
591
592 ldout(m_cct, 20) << dendl;
593
594 librados::ObjectWriteOperation op;
595 cls_client::object_map_resize(&op, Striper::get_num_objects(m_layout, m_size),
596 OBJECT_NONEXISTENT);
597
598 using klass = CreateRequest<I>;
599 librados::AioCompletion *comp =
600 create_rados_callback<klass, &klass::handle_object_map_resize>(this);
601 int r = m_ioctx.aio_operate(m_objmap_name, comp, &op);
602 assert(r == 0);
603 comp->release();
604 }
605
606 template<typename I>
607 void CreateRequest<I>::handle_object_map_resize(int r) {
608 ldout(m_cct, 20) << "r=" << r << dendl;
609
610 if (r < 0) {
611 lderr(m_cct) << "error creating initial object map: "
612 << cpp_strerror(r) << dendl;
613
614 m_r_saved = r;
615 remove_header_object();
616 return;
617 }
618
619 fetch_mirror_mode();
620 }
621
622 template<typename I>
623 void CreateRequest<I>::fetch_mirror_mode() {
624 if ((m_features & RBD_FEATURE_JOURNALING) == 0) {
625 complete(0);
626 return;
627 }
628
629 ldout(m_cct, 20) << dendl;
630
631 librados::ObjectReadOperation op;
632 cls_client::mirror_mode_get_start(&op);
633
634 using klass = CreateRequest<I>;
635 librados::AioCompletion *comp =
636 create_rados_callback<klass, &klass::handle_fetch_mirror_mode>(this);
637 m_outbl.clear();
638 int r = m_ioctx.aio_operate(RBD_MIRRORING, comp, &op, &m_outbl);
639 assert(r == 0);
640 comp->release();
641 }
642
643 template<typename I>
644 void CreateRequest<I>::handle_fetch_mirror_mode(int r) {
645 ldout(m_cct, 20) << "r=" << r << dendl;
646
647 if ((r < 0) && (r != -ENOENT)) {
648 lderr(m_cct) << "failed to retrieve mirror mode: " << cpp_strerror(r)
649 << dendl;
650
651 m_r_saved = r;
652 remove_object_map();
653 return;
654 }
655
656 cls::rbd::MirrorMode mirror_mode_internal = cls::rbd::MIRROR_MODE_DISABLED;
657 if (r == 0) {
658 bufferlist::iterator it = m_outbl.begin();
659 r = cls_client::mirror_mode_get_finish(&it, &mirror_mode_internal);
660 if (r < 0) {
661 lderr(m_cct) << "Failed to retrieve mirror mode" << dendl;
662
663 m_r_saved = r;
664 remove_object_map();
665 return;
666 }
667 }
668
669 // TODO: remove redundant code...
670 switch (mirror_mode_internal) {
671 case cls::rbd::MIRROR_MODE_DISABLED:
672 case cls::rbd::MIRROR_MODE_IMAGE:
673 case cls::rbd::MIRROR_MODE_POOL:
674 m_mirror_mode = static_cast<rbd_mirror_mode_t>(mirror_mode_internal);
675 break;
676 default:
677 lderr(m_cct) << "Unknown mirror mode ("
678 << static_cast<uint32_t>(mirror_mode_internal) << ")" << dendl;
679 r = -EINVAL;
680 remove_object_map();
681 return;
682 }
683
684 journal_create();
685 }
686
687 template<typename I>
688 void CreateRequest<I>::journal_create() {
689 ldout(m_cct, 20) << dendl;
690
691 using klass = CreateRequest<I>;
692 Context *ctx = create_context_callback<klass, &klass::handle_journal_create>(
693 this);
694
695 librbd::journal::TagData tag_data;
696 tag_data.mirror_uuid = (m_force_non_primary ? m_primary_mirror_uuid :
697 librbd::Journal<I>::LOCAL_MIRROR_UUID);
698
699 librbd::journal::CreateRequest<I> *req =
700 librbd::journal::CreateRequest<I>::create(
701 m_ioctx, m_image_id, m_journal_order, m_journal_splay_width,
702 m_journal_pool, cls::journal::Tag::TAG_CLASS_NEW, tag_data,
703 librbd::Journal<I>::IMAGE_CLIENT_ID, m_op_work_queue, ctx);
704 req->send();
705 }
706
707 template<typename I>
708 void CreateRequest<I>::handle_journal_create(int r) {
709 ldout(m_cct, 20) << "r=" << r << dendl;
710
711 if (r < 0) {
712 lderr(m_cct) << "error creating journal: " << cpp_strerror(r)
713 << dendl;
714
715 m_r_saved = r;
716 remove_object_map();
717 return;
718 }
719
720 mirror_image_enable();
721 }
722
723 template<typename I>
724 void CreateRequest<I>::mirror_image_enable() {
725 if (((m_mirror_mode != RBD_MIRROR_MODE_POOL) && !m_force_non_primary) ||
726 m_skip_mirror_enable) {
727 complete(0);
728 return;
729 }
730
731 ldout(m_cct, 20) << dendl;
732 auto ctx = create_context_callback<
733 CreateRequest<I>, &CreateRequest<I>::handle_mirror_image_enable>(this);
734 auto req = mirror::EnableRequest<I>::create(m_ioctx, m_image_id,
735 m_non_primary_global_image_id,
736 m_op_work_queue, ctx);
737 req->send();
738 }
739
740 template<typename I>
741 void CreateRequest<I>::handle_mirror_image_enable(int r) {
742 ldout(m_cct, 20) << "r=" << r << dendl;
743
744 if (r < 0) {
745 lderr(m_cct) << "cannot enable mirroring: " << cpp_strerror(r)
746 << dendl;
747
748 m_r_saved = r;
749 journal_remove();
750 return;
751 }
752
753 complete(0);
754 }
755
756 template<typename I>
757 void CreateRequest<I>::complete(int r) {
758 ldout(m_cct, 20) << dendl;
759
760 if (r == 0) {
761 ldout(m_cct, 20) << "done." << dendl;
762 }
763
764 m_data_io_ctx.close();
765 m_on_finish->complete(r);
766 delete this;
767 }
768
769 // cleanup
770 template<typename I>
771 void CreateRequest<I>::journal_remove() {
772 if ((m_features & RBD_FEATURE_JOURNALING) == 0) {
773 remove_object_map();
774 return;
775 }
776
777 ldout(m_cct, 20) << dendl;
778
779 using klass = CreateRequest<I>;
780 Context *ctx = create_context_callback<klass, &klass::handle_journal_remove>(
781 this);
782
783 librbd::journal::RemoveRequest<I> *req =
784 librbd::journal::RemoveRequest<I>::create(
785 m_ioctx, m_image_id, librbd::Journal<I>::IMAGE_CLIENT_ID, m_op_work_queue,
786 ctx);
787 req->send();
788 }
789
790 template<typename I>
791 void CreateRequest<I>::handle_journal_remove(int r) {
792 ldout(m_cct, 20) << "r=" << r << dendl;
793
794 if (r < 0) {
795 lderr(m_cct) << "error cleaning up journal after creation failed: "
796 << cpp_strerror(r) << dendl;
797 }
798
799 remove_object_map();
800 }
801
802 template<typename I>
803 void CreateRequest<I>::remove_object_map() {
804 if ((m_features & RBD_FEATURE_OBJECT_MAP) == 0) {
805 remove_header_object();
806 return;
807 }
808
809 ldout(m_cct, 20) << dendl;
810
811 using klass = CreateRequest<I>;
812 librados::AioCompletion *comp =
813 create_rados_callback<klass, &klass::handle_remove_object_map>(this);
814 int r = m_ioctx.aio_remove(m_objmap_name, comp);
815 assert(r == 0);
816 comp->release();
817 }
818
819 template<typename I>
820 void CreateRequest<I>::handle_remove_object_map(int r) {
821 ldout(m_cct, 20) << "r=" << r << dendl;
822
823 if (r < 0) {
824 lderr(m_cct) << "error cleaning up object map after creation failed: "
825 << cpp_strerror(r) << dendl;
826 }
827
828 remove_header_object();
829 }
830
831 template<typename I>
832 void CreateRequest<I>::remove_header_object() {
833 ldout(m_cct, 20) << dendl;
834
835 using klass = CreateRequest<I>;
836 librados::AioCompletion *comp =
837 create_rados_callback<klass, &klass::handle_remove_header_object>(this);
838 int r = m_ioctx.aio_remove(m_header_obj, comp);
839 assert(r == 0);
840 comp->release();
841 }
842
843 template<typename I>
844 void CreateRequest<I>::handle_remove_header_object(int r) {
845 ldout(m_cct, 20) << "r=" << r << dendl;
846
847 if (r < 0) {
848 lderr(m_cct) << "error cleaning up image header after creation failed: "
849 << cpp_strerror(r) << dendl;
850 }
851
852 remove_from_dir();
853 }
854
855 template<typename I>
856 void CreateRequest<I>::remove_from_dir() {
857 ldout(m_cct, 20) << dendl;
858
859 librados::ObjectWriteOperation op;
860 cls_client::dir_remove_image(&op, m_image_name, m_image_id);
861
862 using klass = CreateRequest<I>;
863 librados::AioCompletion *comp =
864 create_rados_callback<klass, &klass::handle_remove_from_dir>(this);
865 int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op);
866 assert(r == 0);
867 comp->release();
868 }
869
870 template<typename I>
871 void CreateRequest<I>::handle_remove_from_dir(int r) {
872 ldout(m_cct, 20) << "r=" << r << dendl;
873
874 if (r < 0) {
875 lderr(m_cct) << "error cleaning up image from rbd_directory object "
876 << "after creation failed: " << cpp_strerror(r) << dendl;
877 }
878
879 remove_id_object();
880 }
881
882 template<typename I>
883 void CreateRequest<I>::remove_id_object() {
884 ldout(m_cct, 20) << dendl;
885
886 using klass = CreateRequest<I>;
887 librados::AioCompletion *comp =
888 create_rados_callback<klass, &klass::handle_remove_id_object>(this);
889 int r = m_ioctx.aio_remove(m_id_obj, comp);
890 assert(r == 0);
891 comp->release();
892 }
893
894 template<typename I>
895 void CreateRequest<I>::handle_remove_id_object(int r) {
896 ldout(m_cct, 20) << "r=" << r << dendl;
897
898 if (r < 0) {
899 lderr(m_cct) << "error cleaning up id object after creation failed: "
900 << cpp_strerror(r) << dendl;
901 }
902
903 complete(m_r_saved);
904 }
905
906 } //namespace image
907 } //namespace librbd
908
909 template class librbd::image::CreateRequest<librbd::ImageCtx>;