]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Operations.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / librbd / Operations.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "cls/rbd/cls_rbd_types.h"
5#include "librbd/Operations.h"
6#include "common/dout.h"
7#include "common/errno.h"
8#include "common/WorkQueue.h"
9
10#include "librbd/ExclusiveLock.h"
11#include "librbd/ImageCtx.h"
12#include "librbd/ImageState.h"
13#include "librbd/ImageWatcher.h"
14#include "librbd/ObjectMap.h"
15#include "librbd/Utils.h"
16#include "librbd/journal/DisabledPolicy.h"
17#include "librbd/journal/StandardPolicy.h"
18#include "librbd/operation/DisableFeaturesRequest.h"
19#include "librbd/operation/EnableFeaturesRequest.h"
20#include "librbd/operation/FlattenRequest.h"
21#include "librbd/operation/MetadataRemoveRequest.h"
22#include "librbd/operation/MetadataSetRequest.h"
23#include "librbd/operation/ObjectMapIterate.h"
24#include "librbd/operation/RebuildObjectMapRequest.h"
25#include "librbd/operation/RenameRequest.h"
26#include "librbd/operation/ResizeRequest.h"
27#include "librbd/operation/SnapshotCreateRequest.h"
28#include "librbd/operation/SnapshotProtectRequest.h"
29#include "librbd/operation/SnapshotRemoveRequest.h"
30#include "librbd/operation/SnapshotRenameRequest.h"
31#include "librbd/operation/SnapshotRollbackRequest.h"
32#include "librbd/operation/SnapshotUnprotectRequest.h"
33#include "librbd/operation/SnapshotLimitRequest.h"
34#include <set>
35#include <boost/bind.hpp>
36#include <boost/scope_exit.hpp>
37
38#define dout_subsys ceph_subsys_rbd
39#undef dout_prefix
40#define dout_prefix *_dout << "librbd::Operations: "
41
42namespace librbd {
43
44namespace {
45
46template <typename I>
47struct C_NotifyUpdate : public Context {
48 I &image_ctx;
49 Context *on_finish;
50 bool notified = false;
51
52 C_NotifyUpdate(I &image_ctx, Context *on_finish)
53 : image_ctx(image_ctx), on_finish(on_finish) {
54 }
55
56 void complete(int r) override {
57 CephContext *cct = image_ctx.cct;
58 if (notified) {
59 if (r == -ETIMEDOUT) {
60 // don't fail the op if a peer fails to get the update notification
61 lderr(cct) << "update notification timed-out" << dendl;
62 r = 0;
63 } else if (r == -ENOENT) {
64 // don't fail if header is missing (e.g. v1 image rename)
65 ldout(cct, 5) << "update notification on missing header" << dendl;
66 r = 0;
67 } else if (r < 0) {
68 lderr(cct) << "update notification failed: " << cpp_strerror(r)
69 << dendl;
70 }
71 Context::complete(r);
72 return;
73 }
74
75 if (r < 0) {
76 // op failed -- no need to send update notification
77 Context::complete(r);
78 return;
79 }
80
81 notified = true;
82 image_ctx.notify_update(this);
83 }
84 void finish(int r) override {
85 on_finish->complete(r);
86 }
87};
88
89template <typename I>
90struct C_InvokeAsyncRequest : public Context {
91 /**
92 * @verbatim
93 *
94 * <start>
95 * |
96 * . . . . . . | . . . . . . . . . . . . . . . . . .
97 * . . | . .
98 * . v v v .
99 * . REFRESH_IMAGE (skip if not needed) .
100 * . | .
101 * . v .
102 * . ACQUIRE_LOCK (skip if exclusive lock .
103 * . | disabled or has lock) .
104 * . | .
105 * . /--------/ \--------\ . . . . . . . . . . . . .
106 * . | | .
107 * . v v .
108 * LOCAL_REQUEST REMOTE_REQUEST
109 * | |
110 * | |
111 * \--------\ /--------/
112 * |
113 * v
114 * <finish>
115 *
116 * @endverbatim
117 */
118
119 I &image_ctx;
120 std::string request_type;
121 bool permit_snapshot;
122 boost::function<void(Context*)> local;
123 boost::function<void(Context*)> remote;
124 std::set<int> filter_error_codes;
125 Context *on_finish;
126 bool request_lock = false;
127
128 C_InvokeAsyncRequest(I &image_ctx, const std::string& request_type,
129 bool permit_snapshot,
130 const boost::function<void(Context*)>& local,
131 const boost::function<void(Context*)>& remote,
132 const std::set<int> &filter_error_codes,
133 Context *on_finish)
134 : image_ctx(image_ctx), request_type(request_type),
135 permit_snapshot(permit_snapshot), local(local), remote(remote),
136 filter_error_codes(filter_error_codes), on_finish(on_finish) {
137 }
138
139 void send() {
140 send_refresh_image();
141 }
142
143 void send_refresh_image() {
144 if (!image_ctx.state->is_refresh_required()) {
145 send_acquire_exclusive_lock();
146 return;
147 }
148
149 CephContext *cct = image_ctx.cct;
150 ldout(cct, 20) << __func__ << dendl;
151
152 Context *ctx = util::create_context_callback<
153 C_InvokeAsyncRequest<I>,
154 &C_InvokeAsyncRequest<I>::handle_refresh_image>(this);
155 image_ctx.state->refresh(ctx);
156 }
157
158 void handle_refresh_image(int r) {
159 CephContext *cct = image_ctx.cct;
160 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
161
162 if (r < 0) {
163 lderr(cct) << "failed to refresh image: " << cpp_strerror(r) << dendl;
164 complete(r);
165 return;
166 }
167
168 send_acquire_exclusive_lock();
169 }
170
171 void send_acquire_exclusive_lock() {
172 // context can complete before owner_lock is unlocked
173 RWLock &owner_lock(image_ctx.owner_lock);
174 owner_lock.get_read();
175 image_ctx.snap_lock.get_read();
176 if (image_ctx.read_only ||
177 (!permit_snapshot && image_ctx.snap_id != CEPH_NOSNAP)) {
178 image_ctx.snap_lock.put_read();
179 owner_lock.put_read();
180 complete(-EROFS);
181 return;
182 }
183 image_ctx.snap_lock.put_read();
184
185 if (image_ctx.exclusive_lock == nullptr) {
186 send_local_request();
187 owner_lock.put_read();
188 return;
189 } else if (image_ctx.image_watcher == nullptr) {
190 owner_lock.put_read();
191 complete(-EROFS);
192 return;
193 }
194
195 int r;
196 if (image_ctx.exclusive_lock->is_lock_owner() &&
197 image_ctx.exclusive_lock->accept_requests(&r)) {
198 send_local_request();
199 owner_lock.put_read();
200 return;
201 }
202
203 CephContext *cct = image_ctx.cct;
204 ldout(cct, 20) << __func__ << dendl;
205
206 Context *ctx = util::create_async_context_callback(
207 image_ctx, util::create_context_callback<
208 C_InvokeAsyncRequest<I>,
209 &C_InvokeAsyncRequest<I>::handle_acquire_exclusive_lock>(this));
210
211 if (request_lock) {
212 // current lock owner doesn't support op -- try to perform
213 // the action locally
214 request_lock = false;
215 image_ctx.exclusive_lock->acquire_lock(ctx);
216 } else {
217 image_ctx.exclusive_lock->try_acquire_lock(ctx);
218 }
219 owner_lock.put_read();
220 }
221
222 void handle_acquire_exclusive_lock(int r) {
223 CephContext *cct = image_ctx.cct;
224 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
225
226 if (r < 0) {
227 complete(-EROFS);
228 return;
229 }
230
231 // context can complete before owner_lock is unlocked
232 RWLock &owner_lock(image_ctx.owner_lock);
233 owner_lock.get_read();
234 if (image_ctx.exclusive_lock->is_lock_owner()) {
235 send_local_request();
236 owner_lock.put_read();
237 return;
238 }
239
240 send_remote_request();
241 owner_lock.put_read();
242 }
243
244 void send_remote_request() {
245 assert(image_ctx.owner_lock.is_locked());
246
247 CephContext *cct = image_ctx.cct;
248 ldout(cct, 20) << __func__ << dendl;
249
250 Context *ctx = util::create_context_callback<
251 C_InvokeAsyncRequest<I>, &C_InvokeAsyncRequest<I>::handle_remote_request>(
252 this);
253 remote(ctx);
254 }
255
256 void handle_remote_request(int r) {
257 CephContext *cct = image_ctx.cct;
258 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
259
260 if (r == -EOPNOTSUPP) {
261 ldout(cct, 5) << request_type << " not supported by current lock owner"
262 << dendl;
263 request_lock = true;
264 send_refresh_image();
265 return;
266 } else if (r != -ETIMEDOUT && r != -ERESTART) {
267 image_ctx.state->handle_update_notification();
268
269 complete(r);
270 return;
271 }
272
273 ldout(cct, 5) << request_type << " timed out notifying lock owner"
274 << dendl;
275 send_refresh_image();
276 }
277
278 void send_local_request() {
279 assert(image_ctx.owner_lock.is_locked());
280
281 CephContext *cct = image_ctx.cct;
282 ldout(cct, 20) << __func__ << dendl;
283
284 Context *ctx = util::create_async_context_callback(
285 image_ctx, util::create_context_callback<
286 C_InvokeAsyncRequest<I>,
287 &C_InvokeAsyncRequest<I>::handle_local_request>(this));
288 local(ctx);
289 }
290
291 void handle_local_request(int r) {
292 CephContext *cct = image_ctx.cct;
293 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
294
295 if (r == -ERESTART) {
296 send_refresh_image();
297 return;
298 }
299 complete(r);
300 }
301
302 void finish(int r) override {
303 if (filter_error_codes.count(r) != 0) {
304 r = 0;
305 }
306 on_finish->complete(r);
307 }
308};
309
310template <typename I>
311bool needs_invalidate(I& image_ctx, uint64_t object_no,
312 uint8_t current_state, uint8_t new_state) {
313 if ( (current_state == OBJECT_EXISTS ||
314 current_state == OBJECT_EXISTS_CLEAN) &&
315 (new_state == OBJECT_NONEXISTENT ||
316 new_state == OBJECT_PENDING)) {
317 return false;
318 }
319 return true;
320}
321
322} // anonymous namespace
323
324template <typename I>
325Operations<I>::Operations(I &image_ctx)
326 : m_image_ctx(image_ctx), m_async_request_seq(0) {
327}
328
329template <typename I>
330int Operations<I>::flatten(ProgressContext &prog_ctx) {
331 CephContext *cct = m_image_ctx.cct;
332 ldout(cct, 20) << "flatten" << dendl;
333
334 int r = m_image_ctx.state->refresh_if_required();
335 if (r < 0) {
336 return r;
337 }
338
339 if (m_image_ctx.read_only) {
340 return -EROFS;
341 }
342
343 {
344 RWLock::RLocker parent_locker(m_image_ctx.parent_lock);
345 if (m_image_ctx.parent_md.spec.pool_id == -1) {
346 lderr(cct) << "image has no parent" << dendl;
347 return -EINVAL;
348 }
349 }
350
351 uint64_t request_id = ++m_async_request_seq;
352 r = invoke_async_request("flatten", false,
353 boost::bind(&Operations<I>::execute_flatten, this,
354 boost::ref(prog_ctx), _1),
355 boost::bind(&ImageWatcher<I>::notify_flatten,
356 m_image_ctx.image_watcher, request_id,
357 boost::ref(prog_ctx), _1));
358
359 if (r < 0 && r != -EINVAL) {
360 return r;
361 }
362 ldout(cct, 20) << "flatten finished" << dendl;
363 return 0;
364}
365
366template <typename I>
367void Operations<I>::execute_flatten(ProgressContext &prog_ctx,
368 Context *on_finish) {
369 assert(m_image_ctx.owner_lock.is_locked());
370 assert(m_image_ctx.exclusive_lock == nullptr ||
371 m_image_ctx.exclusive_lock->is_lock_owner());
372
373 CephContext *cct = m_image_ctx.cct;
374 ldout(cct, 20) << "flatten" << dendl;
375
376 if (m_image_ctx.read_only) {
377 on_finish->complete(-EROFS);
378 return;
379 }
380
381 m_image_ctx.snap_lock.get_read();
382 m_image_ctx.parent_lock.get_read();
383
384 // can't flatten a non-clone
385 if (m_image_ctx.parent_md.spec.pool_id == -1) {
386 lderr(cct) << "image has no parent" << dendl;
387 m_image_ctx.parent_lock.put_read();
388 m_image_ctx.snap_lock.put_read();
389 on_finish->complete(-EINVAL);
390 return;
391 }
392 if (m_image_ctx.snap_id != CEPH_NOSNAP) {
393 lderr(cct) << "snapshots cannot be flattened" << dendl;
394 m_image_ctx.parent_lock.put_read();
395 m_image_ctx.snap_lock.put_read();
396 on_finish->complete(-EROFS);
397 return;
398 }
399
400 ::SnapContext snapc = m_image_ctx.snapc;
401 assert(m_image_ctx.parent != NULL);
402
403 uint64_t overlap;
404 int r = m_image_ctx.get_parent_overlap(CEPH_NOSNAP, &overlap);
405 assert(r == 0);
406 assert(overlap <= m_image_ctx.size);
407
408 uint64_t object_size = m_image_ctx.get_object_size();
409 uint64_t overlap_objects = Striper::get_num_objects(m_image_ctx.layout,
410 overlap);
411
412 m_image_ctx.parent_lock.put_read();
413 m_image_ctx.snap_lock.put_read();
414
415 operation::FlattenRequest<I> *req = new operation::FlattenRequest<I>(
416 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), object_size,
417 overlap_objects, snapc, prog_ctx);
418 req->send();
419}
420
421template <typename I>
422int Operations<I>::rebuild_object_map(ProgressContext &prog_ctx) {
423 CephContext *cct = m_image_ctx.cct;
424 ldout(cct, 10) << "rebuild_object_map" << dendl;
425
426 int r = m_image_ctx.state->refresh_if_required();
427 if (r < 0) {
428 return r;
429 }
430
431 uint64_t request_id = ++m_async_request_seq;
432 r = invoke_async_request("rebuild object map", true,
433 boost::bind(&Operations<I>::execute_rebuild_object_map,
434 this, boost::ref(prog_ctx), _1),
435 boost::bind(&ImageWatcher<I>::notify_rebuild_object_map,
436 m_image_ctx.image_watcher, request_id,
437 boost::ref(prog_ctx), _1));
438
439 ldout(cct, 10) << "rebuild object map finished" << dendl;
440 if (r < 0) {
441 return r;
442 }
443 return 0;
444}
445
446template <typename I>
447void Operations<I>::execute_rebuild_object_map(ProgressContext &prog_ctx,
448 Context *on_finish) {
449 assert(m_image_ctx.owner_lock.is_locked());
450 assert(m_image_ctx.exclusive_lock == nullptr ||
451 m_image_ctx.exclusive_lock->is_lock_owner());
452
453 CephContext *cct = m_image_ctx.cct;
454 ldout(cct, 5) << this << " " << __func__ << dendl;
455
456 if (m_image_ctx.read_only) {
457 on_finish->complete(-EROFS);
458 return;
459 }
460 if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP)) {
461 lderr(cct) << "image must support object-map feature" << dendl;
462 on_finish->complete(-EINVAL);
463 return;
464 }
465
466 operation::RebuildObjectMapRequest<I> *req =
467 new operation::RebuildObjectMapRequest<I>(
468 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), prog_ctx);
469 req->send();
470}
471
472template <typename I>
473int Operations<I>::check_object_map(ProgressContext &prog_ctx) {
474 CephContext *cct = m_image_ctx.cct;
475 ldout(cct, 5) << this << " " << __func__ << dendl;
476 int r = m_image_ctx.state->refresh_if_required();
477 if (r < 0) {
478 return r;
479 }
480
481 r = invoke_async_request("check object map", true,
482 boost::bind(&Operations<I>::check_object_map, this,
483 boost::ref(prog_ctx), _1),
484 [] (Context *c) { c->complete(-EOPNOTSUPP); });
485
486 return r;
487}
488
489template <typename I>
490void Operations<I>::object_map_iterate(ProgressContext &prog_ctx,
491 operation::ObjectIterateWork<I> handle_mismatch,
492 Context *on_finish) {
493 assert(m_image_ctx.owner_lock.is_locked());
494 assert(m_image_ctx.exclusive_lock == nullptr ||
495 m_image_ctx.exclusive_lock->is_lock_owner());
496
497 if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP)) {
498 on_finish->complete(-EINVAL);
499 return;
500 }
501
502 operation::ObjectMapIterateRequest<I> *req =
503 new operation::ObjectMapIterateRequest<I>(m_image_ctx, on_finish,
504 prog_ctx, handle_mismatch);
505 req->send();
506}
507
508template <typename I>
509void Operations<I>::check_object_map(ProgressContext &prog_ctx,
510 Context *on_finish) {
511 object_map_iterate(prog_ctx, needs_invalidate, on_finish);
512}
513
514template <typename I>
515int Operations<I>::rename(const char *dstname) {
516 CephContext *cct = m_image_ctx.cct;
517 ldout(cct, 5) << this << " " << __func__ << ": dest_name=" << dstname
518 << dendl;
519
520 int r = librbd::detect_format(m_image_ctx.md_ctx, dstname, NULL, NULL);
521 if (r < 0 && r != -ENOENT) {
522 lderr(cct) << "error checking for existing image called "
523 << dstname << ":" << cpp_strerror(r) << dendl;
524 return r;
525 }
526 if (r == 0) {
527 lderr(cct) << "rbd image " << dstname << " already exists" << dendl;
528 return -EEXIST;
529 }
530
531 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
532 r = invoke_async_request("rename", true,
533 boost::bind(&Operations<I>::execute_rename, this,
534 dstname, _1),
535 boost::bind(&ImageWatcher<I>::notify_rename,
536 m_image_ctx.image_watcher, dstname,
537 _1));
538 if (r < 0 && r != -EEXIST) {
539 return r;
540 }
541 } else {
542 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
543 C_SaferCond cond_ctx;
544 execute_rename(dstname, &cond_ctx);
545
546 r = cond_ctx.wait();
547 if (r < 0) {
548 return r;
549 }
550 }
551
552 m_image_ctx.set_image_name(dstname);
553 return 0;
554}
555
556template <typename I>
557void Operations<I>::execute_rename(const std::string &dest_name,
558 Context *on_finish) {
559 assert(m_image_ctx.owner_lock.is_locked());
560 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
561 assert(m_image_ctx.exclusive_lock == nullptr ||
562 m_image_ctx.exclusive_lock->is_lock_owner());
563 }
564
565 m_image_ctx.snap_lock.get_read();
566 if (m_image_ctx.name == dest_name) {
567 m_image_ctx.snap_lock.put_read();
568 on_finish->complete(-EEXIST);
569 return;
570 }
571 m_image_ctx.snap_lock.put_read();
572
573 CephContext *cct = m_image_ctx.cct;
574 ldout(cct, 5) << this << " " << __func__ << ": dest_name=" << dest_name
575 << dendl;
576
577 if (m_image_ctx.old_format) {
578 // unregister watch before and register back after rename
579 on_finish = new C_NotifyUpdate<I>(m_image_ctx, on_finish);
580 on_finish = new FunctionContext([this, on_finish](int r) {
581 if (m_image_ctx.old_format) {
582 m_image_ctx.image_watcher->set_oid(m_image_ctx.header_oid);
583 }
584 m_image_ctx.image_watcher->register_watch(on_finish);
585 });
586 on_finish = new FunctionContext([this, dest_name, on_finish](int r) {
587 operation::RenameRequest<I> *req = new operation::RenameRequest<I>(
588 m_image_ctx, on_finish, dest_name);
589 req->send();
590 });
591 m_image_ctx.image_watcher->unregister_watch(on_finish);
592 return;
593 }
594 operation::RenameRequest<I> *req = new operation::RenameRequest<I>(
595 m_image_ctx, on_finish, dest_name);
596 req->send();
597}
598
599template <typename I>
600int Operations<I>::resize(uint64_t size, bool allow_shrink, ProgressContext& prog_ctx) {
601 CephContext *cct = m_image_ctx.cct;
602
603 m_image_ctx.snap_lock.get_read();
604 ldout(cct, 5) << this << " " << __func__ << ": "
605 << "size=" << m_image_ctx.size << ", "
606 << "new_size=" << size << dendl;
607 m_image_ctx.snap_lock.put_read();
608
609 int r = m_image_ctx.state->refresh_if_required();
610 if (r < 0) {
611 return r;
612 }
613
614 if (m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP) &&
615 !ObjectMap<>::is_compatible(m_image_ctx.layout, size)) {
616 lderr(cct) << "New size not compatible with object map" << dendl;
617 return -EINVAL;
618 }
619
620 uint64_t request_id = ++m_async_request_seq;
621 r = invoke_async_request("resize", false,
622 boost::bind(&Operations<I>::execute_resize, this,
623 size, allow_shrink, boost::ref(prog_ctx), _1, 0),
624 boost::bind(&ImageWatcher<I>::notify_resize,
625 m_image_ctx.image_watcher, request_id,
626 size, allow_shrink, boost::ref(prog_ctx), _1));
627
628 m_image_ctx.perfcounter->inc(l_librbd_resize);
629 ldout(cct, 2) << "resize finished" << dendl;
630 return r;
631}
632
633template <typename I>
634void Operations<I>::execute_resize(uint64_t size, bool allow_shrink, ProgressContext &prog_ctx,
635 Context *on_finish,
636 uint64_t journal_op_tid) {
637 assert(m_image_ctx.owner_lock.is_locked());
638 assert(m_image_ctx.exclusive_lock == nullptr ||
639 m_image_ctx.exclusive_lock->is_lock_owner());
640
641 CephContext *cct = m_image_ctx.cct;
642 m_image_ctx.snap_lock.get_read();
643 ldout(cct, 5) << this << " " << __func__ << ": "
644 << "size=" << m_image_ctx.size << ", "
645 << "new_size=" << size << dendl;
646
647 if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
648 m_image_ctx.snap_lock.put_read();
649 on_finish->complete(-EROFS);
650 return;
651 } else if (m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP,
652 m_image_ctx.snap_lock) &&
653 !ObjectMap<>::is_compatible(m_image_ctx.layout, size)) {
654 m_image_ctx.snap_lock.put_read();
655 on_finish->complete(-EINVAL);
656 return;
657 }
658 m_image_ctx.snap_lock.put_read();
659
660 operation::ResizeRequest<I> *req = new operation::ResizeRequest<I>(
661 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), size, allow_shrink,
662 prog_ctx, journal_op_tid, false);
663 req->send();
664}
665
666template <typename I>
667int Operations<I>::snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
668 const char *snap_name) {
669 if (m_image_ctx.read_only) {
670 return -EROFS;
671 }
672
673 int r = m_image_ctx.state->refresh_if_required();
674 if (r < 0) {
675 return r;
676 }
677
678 C_SaferCond ctx;
679 snap_create(snap_namespace, snap_name, &ctx);
680 r = ctx.wait();
681
682 if (r < 0) {
683 return r;
684 }
685
686 m_image_ctx.perfcounter->inc(l_librbd_snap_create);
687 return r;
688}
689
690template <typename I>
691void Operations<I>::snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
692 const char *snap_name,
693 Context *on_finish) {
694 CephContext *cct = m_image_ctx.cct;
695 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
696 << dendl;
697
698 if (m_image_ctx.read_only) {
699 on_finish->complete(-EROFS);
700 return;
701 }
702
703 m_image_ctx.snap_lock.get_read();
704 if (m_image_ctx.get_snap_id(snap_namespace, snap_name) != CEPH_NOSNAP) {
705 m_image_ctx.snap_lock.put_read();
706 on_finish->complete(-EEXIST);
707 return;
708 }
709 m_image_ctx.snap_lock.put_read();
710
711 C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
712 m_image_ctx, "snap_create", true,
713 boost::bind(&Operations<I>::execute_snap_create, this, snap_namespace, snap_name,
714 _1, 0, false),
715 boost::bind(&ImageWatcher<I>::notify_snap_create, m_image_ctx.image_watcher,
716 snap_namespace, snap_name, _1),
717 {-EEXIST}, on_finish);
718 req->send();
719}
720
721template <typename I>
722void Operations<I>::execute_snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
723 const std::string &snap_name,
724 Context *on_finish,
725 uint64_t journal_op_tid,
726 bool skip_object_map) {
727 assert(m_image_ctx.owner_lock.is_locked());
728 assert(m_image_ctx.exclusive_lock == nullptr ||
729 m_image_ctx.exclusive_lock->is_lock_owner());
730
731 CephContext *cct = m_image_ctx.cct;
732 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
733 << dendl;
734
735 m_image_ctx.snap_lock.get_read();
736 if (m_image_ctx.get_snap_id(snap_namespace, snap_name) != CEPH_NOSNAP) {
737 m_image_ctx.snap_lock.put_read();
738 on_finish->complete(-EEXIST);
739 return;
740 }
741 m_image_ctx.snap_lock.put_read();
742
743 operation::SnapshotCreateRequest<I> *req =
744 new operation::SnapshotCreateRequest<I>(
745 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish),
746 snap_namespace, snap_name, journal_op_tid, skip_object_map);
747 req->send();
748}
749
750template <typename I>
751int Operations<I>::snap_rollback(const cls::rbd::SnapshotNamespace& snap_namespace,
752 const char *snap_name,
753 ProgressContext& prog_ctx) {
754 CephContext *cct = m_image_ctx.cct;
755 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
756 << dendl;
757
758 int r = m_image_ctx.state->refresh_if_required();
759 if (r < 0)
760 return r;
761
762 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
763 {
764 // need to drop snap_lock before invalidating cache
765 RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
766 if (!m_image_ctx.snap_exists) {
767 return -ENOENT;
768 }
769
770 if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
771 return -EROFS;
772 }
773
774 uint64_t snap_id = m_image_ctx.get_snap_id(snap_namespace, snap_name);
775 if (snap_id == CEPH_NOSNAP) {
776 lderr(cct) << "No such snapshot found." << dendl;
777 return -ENOENT;
778 }
779 }
780
781 r = prepare_image_update();
782 if (r < 0) {
783 return -EROFS;
784 }
785 if (m_image_ctx.exclusive_lock != nullptr &&
786 !m_image_ctx.exclusive_lock->is_lock_owner()) {
787 return -EROFS;
788 }
789
790 C_SaferCond cond_ctx;
791 execute_snap_rollback(snap_namespace, snap_name, prog_ctx, &cond_ctx);
792 r = cond_ctx.wait();
793 if (r < 0) {
794 return r;
795 }
796
797 m_image_ctx.perfcounter->inc(l_librbd_snap_rollback);
798 return r;
799}
800
801template <typename I>
802void Operations<I>::execute_snap_rollback(const cls::rbd::SnapshotNamespace& snap_namespace,
803 const std::string &snap_name,
804 ProgressContext& prog_ctx,
805 Context *on_finish) {
806 assert(m_image_ctx.owner_lock.is_locked());
807 CephContext *cct = m_image_ctx.cct;
808 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
809 << dendl;
810
811 m_image_ctx.snap_lock.get_read();
812 uint64_t snap_id = m_image_ctx.get_snap_id(snap_namespace, snap_name);
813 if (snap_id == CEPH_NOSNAP) {
814 lderr(cct) << "No such snapshot found." << dendl;
815 m_image_ctx.snap_lock.put_read();
816 on_finish->complete(-ENOENT);
817 return;
818 }
819
820 uint64_t new_size = m_image_ctx.get_image_size(snap_id);
821 m_image_ctx.snap_lock.put_read();
822
823 // async mode used for journal replay
824 operation::SnapshotRollbackRequest<I> *request =
825 new operation::SnapshotRollbackRequest<I>(
826 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), snap_namespace, snap_name,
827 snap_id, new_size, prog_ctx);
828 request->send();
829}
830
831template <typename I>
832int Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespace,
833 const char *snap_name) {
834 if (m_image_ctx.read_only) {
835 return -EROFS;
836 }
837
838 int r = m_image_ctx.state->refresh_if_required();
839 if (r < 0) {
840 return r;
841 }
842
843 C_SaferCond ctx;
844 snap_remove(snap_namespace, snap_name, &ctx);
845 r = ctx.wait();
846
847 if (r < 0) {
848 return r;
849 }
850
851 m_image_ctx.perfcounter->inc(l_librbd_snap_remove);
852 return 0;
853}
854
855template <typename I>
856void Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespace,
857 const char *snap_name,
858 Context *on_finish) {
859 CephContext *cct = m_image_ctx.cct;
860 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
861 << dendl;
862
863 if (m_image_ctx.read_only) {
864 on_finish->complete(-EROFS);
865 return;
866 }
867
868 // quickly filter out duplicate ops
869 m_image_ctx.snap_lock.get_read();
870 if (m_image_ctx.get_snap_id(snap_namespace, snap_name) == CEPH_NOSNAP) {
871 m_image_ctx.snap_lock.put_read();
872 on_finish->complete(-ENOENT);
873 return;
874 }
875
876 bool proxy_op = ((m_image_ctx.features & RBD_FEATURE_FAST_DIFF) != 0 ||
877 (m_image_ctx.features & RBD_FEATURE_JOURNALING) != 0);
878 m_image_ctx.snap_lock.put_read();
879
880 if (proxy_op) {
881 C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
882 m_image_ctx, "snap_remove", true,
883 boost::bind(&Operations<I>::execute_snap_remove, this, snap_namespace, snap_name, _1),
884 boost::bind(&ImageWatcher<I>::notify_snap_remove, m_image_ctx.image_watcher,
885 snap_namespace, snap_name, _1),
886 {-ENOENT}, on_finish);
887 req->send();
888 } else {
889 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
890 execute_snap_remove(snap_namespace, snap_name, on_finish);
891 }
892}
893
894template <typename I>
895void Operations<I>::execute_snap_remove(const cls::rbd::SnapshotNamespace& snap_namespace,
896 const std::string &snap_name,
897 Context *on_finish) {
898 assert(m_image_ctx.owner_lock.is_locked());
899 {
900 if ((m_image_ctx.features & RBD_FEATURE_FAST_DIFF) != 0) {
901 assert(m_image_ctx.exclusive_lock == nullptr ||
902 m_image_ctx.exclusive_lock->is_lock_owner());
903 }
904 }
905
906 CephContext *cct = m_image_ctx.cct;
907 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
908 << dendl;
909
910 m_image_ctx.snap_lock.get_read();
911 uint64_t snap_id = m_image_ctx.get_snap_id(snap_namespace, snap_name);
912 if (snap_id == CEPH_NOSNAP) {
913 lderr(m_image_ctx.cct) << "No such snapshot found." << dendl;
914 m_image_ctx.snap_lock.put_read();
915 on_finish->complete(-ENOENT);
916 return;
917 }
918
919 bool is_protected;
920 int r = m_image_ctx.is_snap_protected(snap_id, &is_protected);
921 if (r < 0) {
922 m_image_ctx.snap_lock.put_read();
923 on_finish->complete(r);
924 return;
925 } else if (is_protected) {
926 lderr(m_image_ctx.cct) << "snapshot is protected" << dendl;
927 m_image_ctx.snap_lock.put_read();
928 on_finish->complete(-EBUSY);
929 return;
930 }
931 m_image_ctx.snap_lock.put_read();
932
933 operation::SnapshotRemoveRequest<I> *req =
934 new operation::SnapshotRemoveRequest<I>(
935 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish),
936 snap_namespace, snap_name, snap_id);
937 req->send();
938}
939
940template <typename I>
941int Operations<I>::snap_rename(const char *srcname, const char *dstname) {
942 CephContext *cct = m_image_ctx.cct;
943 ldout(cct, 5) << this << " " << __func__ << ": "
944 << "snap_name=" << srcname << ", "
945 << "new_snap_name=" << dstname << dendl;
946
947 snapid_t snap_id;
948 if (m_image_ctx.read_only) {
949 return -EROFS;
950 }
951
952 int r = m_image_ctx.state->refresh_if_required();
953 if (r < 0)
954 return r;
955
956 {
957 RWLock::RLocker l(m_image_ctx.snap_lock);
958 snap_id = m_image_ctx.get_snap_id(cls::rbd::UserSnapshotNamespace(), srcname);
959 if (snap_id == CEPH_NOSNAP) {
960 return -ENOENT;
961 }
962 if (m_image_ctx.get_snap_id(cls::rbd::UserSnapshotNamespace(), dstname) != CEPH_NOSNAP) {
963 return -EEXIST;
964 }
965 }
966
967 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
968 r = invoke_async_request("snap_rename", true,
969 boost::bind(&Operations<I>::execute_snap_rename,
970 this, snap_id, dstname, _1),
971 boost::bind(&ImageWatcher<I>::notify_snap_rename,
972 m_image_ctx.image_watcher, snap_id,
973 dstname, _1));
974 if (r < 0 && r != -EEXIST) {
975 return r;
976 }
977 } else {
978 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
979 C_SaferCond cond_ctx;
980 execute_snap_rename(snap_id, dstname, &cond_ctx);
981
982 r = cond_ctx.wait();
983 if (r < 0) {
984 return r;
985 }
986 }
987
988 m_image_ctx.perfcounter->inc(l_librbd_snap_rename);
989 return 0;
990}
991
992template <typename I>
993void Operations<I>::execute_snap_rename(const uint64_t src_snap_id,
994 const std::string &dest_snap_name,
995 Context *on_finish) {
996 assert(m_image_ctx.owner_lock.is_locked());
997 if ((m_image_ctx.features & RBD_FEATURE_JOURNALING) != 0) {
998 assert(m_image_ctx.exclusive_lock == nullptr ||
999 m_image_ctx.exclusive_lock->is_lock_owner());
1000 }
1001
1002 m_image_ctx.snap_lock.get_read();
1003 if (m_image_ctx.get_snap_id(cls::rbd::UserSnapshotNamespace(),
1004 dest_snap_name) != CEPH_NOSNAP) {
1005 // Renaming is supported for snapshots from user namespace only.
1006 m_image_ctx.snap_lock.put_read();
1007 on_finish->complete(-EEXIST);
1008 return;
1009 }
1010 m_image_ctx.snap_lock.put_read();
1011
1012 CephContext *cct = m_image_ctx.cct;
1013 ldout(cct, 5) << this << " " << __func__ << ": "
1014 << "snap_id=" << src_snap_id << ", "
1015 << "new_snap_name=" << dest_snap_name << dendl;
1016
1017 operation::SnapshotRenameRequest<I> *req =
1018 new operation::SnapshotRenameRequest<I>(
1019 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), src_snap_id,
1020 dest_snap_name);
1021 req->send();
1022}
1023
1024template <typename I>
1025int Operations<I>::snap_protect(const cls::rbd::SnapshotNamespace& snap_namespace,
1026 const char *snap_name) {
1027 CephContext *cct = m_image_ctx.cct;
1028 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1029 << dendl;
1030
1031 if (m_image_ctx.read_only) {
1032 return -EROFS;
1033 }
1034
1035 if (!m_image_ctx.test_features(RBD_FEATURE_LAYERING)) {
1036 lderr(cct) << "image must support layering" << dendl;
1037 return -ENOSYS;
1038 }
1039
1040 int r = m_image_ctx.state->refresh_if_required();
1041 if (r < 0) {
1042 return r;
1043 }
1044
1045 {
1046 RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
1047 bool is_protected;
1048 r = m_image_ctx.is_snap_protected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1049 &is_protected);
1050 if (r < 0) {
1051 return r;
1052 }
1053
1054 if (is_protected) {
1055 return -EBUSY;
1056 }
1057 }
1058
1059 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1060 r = invoke_async_request("snap_protect", true,
1061 boost::bind(&Operations<I>::execute_snap_protect,
1062 this, snap_namespace, snap_name, _1),
1063 boost::bind(&ImageWatcher<I>::notify_snap_protect,
1064 m_image_ctx.image_watcher,
1065 snap_namespace, snap_name, _1));
1066 if (r < 0 && r != -EBUSY) {
1067 return r;
1068 }
1069 } else {
1070 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1071 C_SaferCond cond_ctx;
1072 execute_snap_protect(snap_namespace, snap_name, &cond_ctx);
1073
1074 r = cond_ctx.wait();
1075 if (r < 0) {
1076 return r;
1077 }
1078 }
1079 return 0;
1080}
1081
1082template <typename I>
1083void Operations<I>::execute_snap_protect(const cls::rbd::SnapshotNamespace& snap_namespace,
1084 const std::string &snap_name,
1085 Context *on_finish) {
1086 assert(m_image_ctx.owner_lock.is_locked());
1087 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1088 assert(m_image_ctx.exclusive_lock == nullptr ||
1089 m_image_ctx.exclusive_lock->is_lock_owner());
1090 }
1091
1092 m_image_ctx.snap_lock.get_read();
1093 bool is_protected;
1094 int r = m_image_ctx.is_snap_protected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1095 &is_protected);
1096 if (r < 0) {
1097 m_image_ctx.snap_lock.put_read();
1098 on_finish->complete(r);
1099 return;
1100 } else if (is_protected) {
1101 m_image_ctx.snap_lock.put_read();
1102 on_finish->complete(-EBUSY);
1103 return;
1104 }
1105 m_image_ctx.snap_lock.put_read();
1106
1107 CephContext *cct = m_image_ctx.cct;
1108 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1109 << dendl;
1110
1111 operation::SnapshotProtectRequest<I> *request =
1112 new operation::SnapshotProtectRequest<I>(
1113 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), snap_namespace, snap_name);
1114 request->send();
1115}
1116
1117template <typename I>
1118int Operations<I>::snap_unprotect(const cls::rbd::SnapshotNamespace& snap_namespace,
1119 const char *snap_name) {
1120 CephContext *cct = m_image_ctx.cct;
1121 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1122 << dendl;
1123
1124 if (m_image_ctx.read_only) {
1125 return -EROFS;
1126 }
1127
1128 int r = m_image_ctx.state->refresh_if_required();
1129 if (r < 0) {
1130 return r;
1131 }
1132
1133 {
1134 RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
1135 bool is_unprotected;
1136 r = m_image_ctx.is_snap_unprotected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1137 &is_unprotected);
1138 if (r < 0) {
1139 return r;
1140 }
1141
1142 if (is_unprotected) {
1143 return -EINVAL;
1144 }
1145 }
1146
1147 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1148 r = invoke_async_request("snap_unprotect", true,
1149 boost::bind(&Operations<I>::execute_snap_unprotect,
1150 this, snap_namespace, snap_name, _1),
1151 boost::bind(&ImageWatcher<I>::notify_snap_unprotect,
1152 m_image_ctx.image_watcher,
1153 snap_namespace, snap_name, _1));
1154 if (r < 0 && r != -EINVAL) {
1155 return r;
1156 }
1157 } else {
1158 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1159 C_SaferCond cond_ctx;
1160 execute_snap_unprotect(snap_namespace, snap_name, &cond_ctx);
1161
1162 r = cond_ctx.wait();
1163 if (r < 0) {
1164 return r;
1165 }
1166 }
1167 return 0;
1168}
1169
1170template <typename I>
1171void Operations<I>::execute_snap_unprotect(const cls::rbd::SnapshotNamespace& snap_namespace,
1172 const std::string &snap_name,
1173 Context *on_finish) {
1174 assert(m_image_ctx.owner_lock.is_locked());
1175 if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1176 assert(m_image_ctx.exclusive_lock == nullptr ||
1177 m_image_ctx.exclusive_lock->is_lock_owner());
1178 }
1179
1180 m_image_ctx.snap_lock.get_read();
1181 bool is_unprotected;
1182 int r = m_image_ctx.is_snap_unprotected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1183 &is_unprotected);
1184 if (r < 0) {
1185 m_image_ctx.snap_lock.put_read();
1186 on_finish->complete(r);
1187 return;
1188 } else if (is_unprotected) {
1189 m_image_ctx.snap_lock.put_read();
1190 on_finish->complete(-EINVAL);
1191 return;
1192 }
1193 m_image_ctx.snap_lock.put_read();
1194
1195 CephContext *cct = m_image_ctx.cct;
1196 ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1197 << dendl;
1198
1199 operation::SnapshotUnprotectRequest<I> *request =
1200 new operation::SnapshotUnprotectRequest<I>(
1201 m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), snap_namespace, snap_name);
1202 request->send();
1203}
1204
1205template <typename I>
1206int Operations<I>::snap_set_limit(uint64_t limit) {
1207 CephContext *cct = m_image_ctx.cct;
1208 ldout(cct, 5) << this << " " << __func__ << ": limit=" << limit << dendl;
1209
1210 if (m_image_ctx.read_only) {
1211 return -EROFS;
1212 }
1213
1214 int r = m_image_ctx.state->refresh_if_required();
1215 if (r < 0) {
1216 return r;
1217 }
1218
1219 {
1220 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1221 C_SaferCond limit_ctx;
1222
1223 if (m_image_ctx.exclusive_lock != nullptr &&
1224 !m_image_ctx.exclusive_lock->is_lock_owner()) {
1225 C_SaferCond lock_ctx;
1226
1227 m_image_ctx.exclusive_lock->acquire_lock(&lock_ctx);
1228 r = lock_ctx.wait();
1229 if (r < 0) {
1230 return r;
1231 }
1232 }
1233
1234 execute_snap_set_limit(limit, &limit_ctx);
1235 r = limit_ctx.wait();
1236 }
1237
1238 return r;
1239}
1240
1241template <typename I>
1242void Operations<I>::execute_snap_set_limit(const uint64_t limit,
1243 Context *on_finish) {
1244 assert(m_image_ctx.owner_lock.is_locked());
1245
1246 CephContext *cct = m_image_ctx.cct;
1247 ldout(cct, 5) << this << " " << __func__ << ": limit=" << limit
1248 << dendl;
1249
1250 operation::SnapshotLimitRequest<I> *request =
1251 new operation::SnapshotLimitRequest<I>(m_image_ctx, on_finish, limit);
1252 request->send();
1253}
1254
1255template <typename I>
1256int Operations<I>::update_features(uint64_t features, bool enabled) {
1257 CephContext *cct = m_image_ctx.cct;
1258 ldout(cct, 5) << this << " " << __func__ << ": features=" << features
1259 << ", enabled=" << enabled << dendl;
1260
1261 int r = m_image_ctx.state->refresh_if_required();
1262 if (r < 0) {
1263 return r;
1264 }
1265
1266 if (m_image_ctx.read_only) {
1267 return -EROFS;
1268 } else if (m_image_ctx.old_format) {
1269 lderr(cct) << "old-format images do not support features" << dendl;
1270 return -EINVAL;
1271 }
1272
1273 uint64_t disable_mask = (RBD_FEATURES_MUTABLE |
1274 RBD_FEATURES_DISABLE_ONLY);
1275 if ((enabled && (features & RBD_FEATURES_MUTABLE) != features) ||
1276 (!enabled && (features & disable_mask) != features)) {
1277 lderr(cct) << "cannot update immutable features" << dendl;
1278 return -EINVAL;
1279 }
1280 if (features == 0) {
1281 lderr(cct) << "update requires at least one feature" << dendl;
1282 return -EINVAL;
1283 }
1284 {
1285 RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
1286 if (enabled && (features & m_image_ctx.features) != 0) {
1287 lderr(cct) << "one or more requested features are already enabled"
1288 << dendl;
1289 return -EINVAL;
1290 }
1291 if (!enabled && (features & ~m_image_ctx.features) != 0) {
1292 lderr(cct) << "one or more requested features are already disabled"
1293 << dendl;
1294 return -EINVAL;
1295 }
1296 }
1297
1298 // if disabling journaling, avoid attempting to open the journal
1299 // when acquiring the exclusive lock in case the journal is corrupt
1300 bool disabling_journal = false;
1301 if (!enabled && ((features & RBD_FEATURE_JOURNALING) != 0)) {
1302 RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
1303 m_image_ctx.set_journal_policy(new journal::DisabledPolicy());
1304 disabling_journal = true;
1305 }
1306 BOOST_SCOPE_EXIT_ALL( (this)(disabling_journal) ) {
1307 if (disabling_journal) {
1308 RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
1309 m_image_ctx.set_journal_policy(
1310 new journal::StandardPolicy<I>(&m_image_ctx));
1311 }
1312 };
1313
1314 r = invoke_async_request("update_features", false,
1315 boost::bind(&Operations<I>::execute_update_features,
1316 this, features, enabled, _1, 0),
1317 boost::bind(&ImageWatcher<I>::notify_update_features,
1318 m_image_ctx.image_watcher, features,
1319 enabled, _1));
1320 ldout(cct, 2) << "update_features finished" << dendl;
1321 return r;
1322}
1323
1324template <typename I>
1325void Operations<I>::execute_update_features(uint64_t features, bool enabled,
1326 Context *on_finish,
1327 uint64_t journal_op_tid) {
1328 assert(m_image_ctx.owner_lock.is_locked());
1329 assert(m_image_ctx.exclusive_lock == nullptr ||
1330 m_image_ctx.exclusive_lock->is_lock_owner());
1331
1332 CephContext *cct = m_image_ctx.cct;
1333 ldout(cct, 5) << this << " " << __func__ << ": features=" << features
1334 << ", enabled=" << enabled << dendl;
1335
1336 if (enabled) {
1337 operation::EnableFeaturesRequest<I> *req =
1338 new operation::EnableFeaturesRequest<I>(
1339 m_image_ctx, on_finish, journal_op_tid, features);
1340 req->send();
1341 } else {
1342 operation::DisableFeaturesRequest<I> *req =
1343 new operation::DisableFeaturesRequest<I>(
1344 m_image_ctx, on_finish, journal_op_tid, features, false);
1345 req->send();
1346 }
1347}
1348
1349template <typename I>
1350int Operations<I>::metadata_set(const std::string &key,
1351 const std::string &value) {
1352 CephContext *cct = m_image_ctx.cct;
1353 ldout(cct, 5) << this << " " << __func__ << ": key=" << key << ", value="
1354 << value << dendl;
1355
1356 string start = m_image_ctx.METADATA_CONF_PREFIX;
1357 size_t conf_prefix_len = start.size();
1358
1359 if (key.size() > conf_prefix_len && !key.compare(0, conf_prefix_len, start)) {
1360 // validate config setting
1361 string subkey = key.substr(conf_prefix_len, key.size() - conf_prefix_len);
1362 int r = md_config_t().set_val(subkey.c_str(), value);
1363 if (r < 0) {
1364 return r;
1365 }
1366 }
1367
1368 int r = m_image_ctx.state->refresh_if_required();
1369 if (r < 0) {
1370 return r;
1371 }
1372
1373 if (m_image_ctx.read_only) {
1374 return -EROFS;
1375 }
1376
1377 {
1378 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1379 C_SaferCond metadata_ctx;
1380
1381 if (m_image_ctx.exclusive_lock != nullptr &&
1382 !m_image_ctx.exclusive_lock->is_lock_owner()) {
1383 C_SaferCond lock_ctx;
1384
1385 m_image_ctx.exclusive_lock->acquire_lock(&lock_ctx);
1386 r = lock_ctx.wait();
1387 if (r < 0) {
1388 return r;
1389 }
1390 }
1391
1392 execute_metadata_set(key, value, &metadata_ctx);
1393 r = metadata_ctx.wait();
1394 }
1395
1396 return r;
1397}
1398
1399template <typename I>
1400void Operations<I>::execute_metadata_set(const std::string &key,
1401 const std::string &value,
1402 Context *on_finish) {
1403 assert(m_image_ctx.owner_lock.is_locked());
1404
1405 CephContext *cct = m_image_ctx.cct;
1406 ldout(cct, 5) << this << " " << __func__ << ": key=" << key << ", value="
1407 << value << dendl;
1408
1409 operation::MetadataSetRequest<I> *request =
1410 new operation::MetadataSetRequest<I>(m_image_ctx, on_finish, key, value);
1411 request->send();
1412}
1413
1414template <typename I>
1415int Operations<I>::metadata_remove(const std::string &key) {
1416 CephContext *cct = m_image_ctx.cct;
1417 ldout(cct, 5) << this << " " << __func__ << ": key=" << key << dendl;
1418
1419 if (m_image_ctx.read_only) {
1420 return -EROFS;
1421 }
1422
1423 int r = m_image_ctx.state->refresh_if_required();
1424 if (r < 0) {
1425 return r;
1426 }
1427
1428 if (m_image_ctx.read_only) {
1429 return -EROFS;
1430 }
1431
1432 {
1433 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1434 C_SaferCond metadata_ctx;
1435
1436 if (m_image_ctx.exclusive_lock != nullptr &&
1437 !m_image_ctx.exclusive_lock->is_lock_owner()) {
1438 C_SaferCond lock_ctx;
1439
1440 m_image_ctx.exclusive_lock->acquire_lock(&lock_ctx);
1441 r = lock_ctx.wait();
1442 if (r < 0) {
1443 return r;
1444 }
1445 }
1446
1447 execute_metadata_remove(key, &metadata_ctx);
1448 r = metadata_ctx.wait();
1449 }
1450
1451 return r;
1452}
1453
1454template <typename I>
1455void Operations<I>::execute_metadata_remove(const std::string &key,
1456 Context *on_finish) {
1457 assert(m_image_ctx.owner_lock.is_locked());
1458
1459 CephContext *cct = m_image_ctx.cct;
1460 ldout(cct, 5) << this << " " << __func__ << ": key=" << key << dendl;
1461
1462 operation::MetadataRemoveRequest<I> *request =
1463 new operation::MetadataRemoveRequest<I>(m_image_ctx, on_finish, key);
1464 request->send();
1465}
1466
1467template <typename I>
1468int Operations<I>::prepare_image_update() {
1469 assert(m_image_ctx.owner_lock.is_locked() &&
1470 !m_image_ctx.owner_lock.is_wlocked());
1471 if (m_image_ctx.image_watcher == NULL) {
1472 return -EROFS;
1473 }
1474
1475 // need to upgrade to a write lock
1476 int r = 0;
1477 bool trying_lock = false;
1478 C_SaferCond ctx;
1479 m_image_ctx.owner_lock.put_read();
1480 {
1481 RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
1482 if (m_image_ctx.exclusive_lock != nullptr &&
1483 (!m_image_ctx.exclusive_lock->is_lock_owner() ||
1484 !m_image_ctx.exclusive_lock->accept_requests(&r))) {
1485 m_image_ctx.exclusive_lock->try_acquire_lock(&ctx);
1486 trying_lock = true;
1487 }
1488 }
1489
1490 if (trying_lock) {
1491 r = ctx.wait();
1492 }
1493 m_image_ctx.owner_lock.get_read();
1494
1495 return r;
1496}
1497
1498template <typename I>
1499int Operations<I>::invoke_async_request(const std::string& request_type,
1500 bool permit_snapshot,
1501 const boost::function<void(Context*)>& local_request,
1502 const boost::function<void(Context*)>& remote_request) {
1503 C_SaferCond ctx;
1504 C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(m_image_ctx,
1505 request_type,
1506 permit_snapshot,
1507 local_request,
1508 remote_request,
1509 {}, &ctx);
1510 req->send();
1511 return ctx.wait();
1512}
1513
1514} // namespace librbd
1515
1516template class librbd::Operations<librbd::ImageCtx>;