1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/ImageWatcher.h"
5 #include "librbd/ExclusiveLock.h"
6 #include "librbd/ImageCtx.h"
7 #include "librbd/ImageState.h"
8 #include "librbd/internal.h"
9 #include "librbd/TaskFinisher.h"
10 #include "librbd/Types.h"
11 #include "librbd/Utils.h"
12 #include "librbd/asio/ContextWQ.h"
13 #include "librbd/exclusive_lock/Policy.h"
14 #include "librbd/image_watcher/NotifyLockOwner.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "include/encoding.h"
17 #include "common/errno.h"
18 #include <boost/bind/bind.hpp>
20 #define dout_subsys ceph_subsys_rbd
22 #define dout_prefix *_dout << "librbd::ImageWatcher: "
26 using namespace image_watcher
;
27 using namespace watch_notify
;
28 using util::create_async_context_callback
;
29 using util::create_context_callback
;
30 using util::create_rados_callback
;
35 using namespace boost::placeholders
;
37 static const double RETRY_DELAY_SECONDS
= 1.0;
40 struct ImageWatcher
<I
>::C_ProcessPayload
: public Context
{
41 ImageWatcher
*image_watcher
;
44 std::unique_ptr
<watch_notify::Payload
> payload
;
46 C_ProcessPayload(ImageWatcher
*image_watcher
, uint64_t notify_id
,
48 std::unique_ptr
<watch_notify::Payload
> &&payload
)
49 : image_watcher(image_watcher
), notify_id(notify_id
), handle(handle
),
50 payload(std::move(payload
)) {
53 void finish(int r
) override
{
54 image_watcher
->m_async_op_tracker
.start_op();
55 if (image_watcher
->notifications_blocked()) {
56 // requests are blocked -- just ack the notification
58 image_watcher
->acknowledge_notify(notify_id
, handle
, bl
);
60 image_watcher
->process_payload(notify_id
, handle
, payload
.get());
62 image_watcher
->m_async_op_tracker
.finish_op();
67 ImageWatcher
<I
>::ImageWatcher(I
&image_ctx
)
68 : Watcher(image_ctx
.md_ctx
, image_ctx
.op_work_queue
, image_ctx
.header_oid
),
69 m_image_ctx(image_ctx
),
70 m_task_finisher(new TaskFinisher
<Task
>(*m_image_ctx
.cct
)),
71 m_async_request_lock(ceph::make_shared_mutex(
72 util::unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this))),
73 m_owner_client_id_lock(ceph::make_mutex(
74 util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this)))
79 ImageWatcher
<I
>::~ImageWatcher()
81 delete m_task_finisher
;
85 void ImageWatcher
<I
>::unregister_watch(Context
*on_finish
) {
86 CephContext
*cct
= m_image_ctx
.cct
;
87 ldout(cct
, 10) << this << " unregistering image watcher" << dendl
;
89 cancel_async_requests();
91 // flush the task finisher queue before completing
92 on_finish
= create_async_context_callback(m_task_finisher
, on_finish
);
94 on_finish
= new LambdaContext([this, on_finish
](int r
) {
95 cancel_quiesce_requests();
96 m_task_finisher
->cancel_all();
97 m_async_op_tracker
.wait_for_ops(on_finish
);
99 Watcher::unregister_watch(on_finish
);
102 template <typename I
>
103 void ImageWatcher
<I
>::block_notifies(Context
*on_finish
) {
104 CephContext
*cct
= m_image_ctx
.cct
;
105 ldout(cct
, 10) << this << " " << __func__
<< dendl
;
107 on_finish
= new LambdaContext([this, on_finish
](int r
) {
108 cancel_async_requests();
109 on_finish
->complete(r
);
111 Watcher::block_notifies(on_finish
);
114 template <typename I
>
115 void ImageWatcher
<I
>::schedule_async_progress(const AsyncRequestId
&request
,
116 uint64_t offset
, uint64_t total
) {
117 auto ctx
= new LambdaContext([this, request
, offset
, total
](int r
) {
118 if (r
!= -ECANCELED
) {
119 notify_async_progress(request
, offset
, total
);
122 m_task_finisher
->queue(Task(TASK_CODE_ASYNC_PROGRESS
, request
), ctx
);
125 template <typename I
>
126 int ImageWatcher
<I
>::notify_async_progress(const AsyncRequestId
&request
,
127 uint64_t offset
, uint64_t total
) {
128 ldout(m_image_ctx
.cct
, 20) << this << " remote async request progress: "
129 << request
<< " @ " << offset
130 << "/" << total
<< dendl
;
132 send_notify(new AsyncProgressPayload(request
, offset
, total
));
136 template <typename I
>
137 void ImageWatcher
<I
>::schedule_async_complete(const AsyncRequestId
&request
,
139 m_async_op_tracker
.start_op();
140 auto ctx
= new LambdaContext([this, request
, ret_val
=r
](int r
) {
141 if (r
!= -ECANCELED
) {
142 notify_async_complete(request
, ret_val
);
145 m_task_finisher
->queue(ctx
);
148 template <typename I
>
149 void ImageWatcher
<I
>::notify_async_complete(const AsyncRequestId
&request
,
151 ldout(m_image_ctx
.cct
, 20) << this << " remote async request finished: "
152 << request
<< "=" << r
<< dendl
;
154 send_notify(new AsyncCompletePayload(request
, r
),
155 new LambdaContext(boost::bind(&ImageWatcher
<I
>::handle_async_complete
,
156 this, request
, r
, _1
)));
159 template <typename I
>
160 void ImageWatcher
<I
>::handle_async_complete(const AsyncRequestId
&request
,
161 int r
, int ret_val
) {
162 ldout(m_image_ctx
.cct
, 20) << this << " " << __func__
<< ": "
163 << "request=" << request
<< ", r=" << ret_val
166 lderr(m_image_ctx
.cct
) << this << " failed to notify async complete: "
167 << cpp_strerror(ret_val
) << dendl
;
168 if (ret_val
== -ETIMEDOUT
&& !is_unregistered()) {
169 schedule_async_complete(request
, r
);
170 m_async_op_tracker
.finish_op();
175 std::unique_lock async_request_locker
{m_async_request_lock
};
176 mark_async_request_complete(request
, r
);
177 m_async_op_tracker
.finish_op();
180 template <typename I
>
181 void ImageWatcher
<I
>::notify_flatten(uint64_t request_id
,
182 ProgressContext
&prog_ctx
,
183 Context
*on_finish
) {
184 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
185 ceph_assert(m_image_ctx
.exclusive_lock
&&
186 !m_image_ctx
.exclusive_lock
->is_lock_owner());
188 AsyncRequestId
async_request_id(get_client_id(), request_id
);
190 notify_async_request(async_request_id
, new FlattenPayload(async_request_id
),
191 prog_ctx
, on_finish
);
194 template <typename I
>
195 void ImageWatcher
<I
>::notify_resize(uint64_t request_id
, uint64_t size
,
197 ProgressContext
&prog_ctx
,
198 Context
*on_finish
) {
199 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
200 ceph_assert(m_image_ctx
.exclusive_lock
&&
201 !m_image_ctx
.exclusive_lock
->is_lock_owner());
203 AsyncRequestId
async_request_id(get_client_id(), request_id
);
205 notify_async_request(async_request_id
,
206 new ResizePayload(async_request_id
, size
, allow_shrink
),
207 prog_ctx
, on_finish
);
210 template <typename I
>
211 void ImageWatcher
<I
>::notify_snap_create(uint64_t request_id
,
212 const cls::rbd::SnapshotNamespace
&snap_namespace
,
213 const std::string
&snap_name
,
215 ProgressContext
&prog_ctx
,
216 Context
*on_finish
) {
217 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
218 ceph_assert(m_image_ctx
.exclusive_lock
&&
219 !m_image_ctx
.exclusive_lock
->is_lock_owner());
221 AsyncRequestId
async_request_id(get_client_id(), request_id
);
223 notify_async_request(async_request_id
,
224 new SnapCreatePayload(async_request_id
, snap_namespace
,
226 prog_ctx
, on_finish
);
229 template <typename I
>
230 void ImageWatcher
<I
>::notify_snap_rename(uint64_t request_id
,
231 const snapid_t
&src_snap_id
,
232 const std::string
&dst_snap_name
,
233 Context
*on_finish
) {
234 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
235 ceph_assert(m_image_ctx
.exclusive_lock
&&
236 !m_image_ctx
.exclusive_lock
->is_lock_owner());
238 AsyncRequestId
async_request_id(get_client_id(), request_id
);
240 notify_async_request(
242 new SnapRenamePayload(async_request_id
, src_snap_id
, dst_snap_name
),
243 m_no_op_prog_ctx
, on_finish
);
246 template <typename I
>
247 void ImageWatcher
<I
>::notify_snap_remove(
248 uint64_t request_id
, const cls::rbd::SnapshotNamespace
&snap_namespace
,
249 const std::string
&snap_name
, Context
*on_finish
) {
250 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
251 ceph_assert(m_image_ctx
.exclusive_lock
&&
252 !m_image_ctx
.exclusive_lock
->is_lock_owner());
254 AsyncRequestId
async_request_id(get_client_id(), request_id
);
256 notify_async_request(
258 new SnapRemovePayload(async_request_id
, snap_namespace
, snap_name
),
259 m_no_op_prog_ctx
, on_finish
);
262 template <typename I
>
263 void ImageWatcher
<I
>::notify_snap_protect(
264 uint64_t request_id
, const cls::rbd::SnapshotNamespace
&snap_namespace
,
265 const std::string
&snap_name
, Context
*on_finish
) {
266 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
267 ceph_assert(m_image_ctx
.exclusive_lock
&&
268 !m_image_ctx
.exclusive_lock
->is_lock_owner());
270 AsyncRequestId
async_request_id(get_client_id(), request_id
);
272 notify_async_request(
274 new SnapProtectPayload(async_request_id
, snap_namespace
, snap_name
),
275 m_no_op_prog_ctx
, on_finish
);
278 template <typename I
>
279 void ImageWatcher
<I
>::notify_snap_unprotect(
280 uint64_t request_id
, const cls::rbd::SnapshotNamespace
&snap_namespace
,
281 const std::string
&snap_name
, Context
*on_finish
) {
282 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
283 ceph_assert(m_image_ctx
.exclusive_lock
&&
284 !m_image_ctx
.exclusive_lock
->is_lock_owner());
286 AsyncRequestId
async_request_id(get_client_id(), request_id
);
288 notify_async_request(
290 new SnapUnprotectPayload(async_request_id
, snap_namespace
, snap_name
),
291 m_no_op_prog_ctx
, on_finish
);
294 template <typename I
>
295 void ImageWatcher
<I
>::notify_rebuild_object_map(uint64_t request_id
,
296 ProgressContext
&prog_ctx
,
297 Context
*on_finish
) {
298 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
299 ceph_assert(m_image_ctx
.exclusive_lock
&&
300 !m_image_ctx
.exclusive_lock
->is_lock_owner());
302 AsyncRequestId
async_request_id(get_client_id(), request_id
);
304 notify_async_request(async_request_id
,
305 new RebuildObjectMapPayload(async_request_id
),
306 prog_ctx
, on_finish
);
309 template <typename I
>
310 void ImageWatcher
<I
>::notify_rename(uint64_t request_id
,
311 const std::string
&image_name
,
312 Context
*on_finish
) {
313 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
314 ceph_assert(m_image_ctx
.exclusive_lock
&&
315 !m_image_ctx
.exclusive_lock
->is_lock_owner());
317 AsyncRequestId
async_request_id(get_client_id(), request_id
);
319 notify_async_request(async_request_id
,
320 new RenamePayload(async_request_id
, image_name
),
321 m_no_op_prog_ctx
, on_finish
);
324 template <typename I
>
325 void ImageWatcher
<I
>::notify_update_features(uint64_t request_id
,
326 uint64_t features
, bool enabled
,
327 Context
*on_finish
) {
328 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
329 ceph_assert(m_image_ctx
.exclusive_lock
&&
330 !m_image_ctx
.exclusive_lock
->is_lock_owner());
332 AsyncRequestId
async_request_id(get_client_id(), request_id
);
334 notify_async_request(async_request_id
,
335 new UpdateFeaturesPayload(async_request_id
, features
, enabled
),
336 m_no_op_prog_ctx
, on_finish
);
339 template <typename I
>
340 void ImageWatcher
<I
>::notify_migrate(uint64_t request_id
,
341 ProgressContext
&prog_ctx
,
342 Context
*on_finish
) {
343 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
344 ceph_assert(m_image_ctx
.exclusive_lock
&&
345 !m_image_ctx
.exclusive_lock
->is_lock_owner());
347 AsyncRequestId
async_request_id(get_client_id(), request_id
);
349 notify_async_request(async_request_id
, new MigratePayload(async_request_id
),
350 prog_ctx
, on_finish
);
353 template <typename I
>
354 void ImageWatcher
<I
>::notify_sparsify(uint64_t request_id
, size_t sparse_size
,
355 ProgressContext
&prog_ctx
,
356 Context
*on_finish
) {
357 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
358 ceph_assert(m_image_ctx
.exclusive_lock
&&
359 !m_image_ctx
.exclusive_lock
->is_lock_owner());
361 AsyncRequestId
async_request_id(get_client_id(), request_id
);
363 notify_async_request(async_request_id
,
364 new SparsifyPayload(async_request_id
, sparse_size
),
365 prog_ctx
, on_finish
);
368 template <typename I
>
369 void ImageWatcher
<I
>::notify_header_update(Context
*on_finish
) {
370 ldout(m_image_ctx
.cct
, 10) << this << ": " << __func__
<< dendl
;
372 // supports legacy (empty buffer) clients
373 send_notify(new HeaderUpdatePayload(), on_finish
);
376 template <typename I
>
377 void ImageWatcher
<I
>::notify_header_update(librados::IoCtx
&io_ctx
,
378 const std::string
&oid
) {
379 // supports legacy (empty buffer) clients
381 encode(NotifyMessage(new HeaderUpdatePayload()), bl
);
382 io_ctx
.notify2(oid
, bl
, watcher::Notifier::NOTIFY_TIMEOUT
, nullptr);
385 template <typename I
>
386 void ImageWatcher
<I
>::notify_quiesce(uint64_t *request_id
,
387 ProgressContext
&prog_ctx
,
388 Context
*on_finish
) {
389 *request_id
= util::reserve_async_request_id();
391 ldout(m_image_ctx
.cct
, 10) << this << " " << __func__
<< ": request_id="
392 << request_id
<< dendl
;
394 AsyncRequestId
async_request_id(get_client_id(), *request_id
);
396 auto attempts
= m_image_ctx
.config
.template get_val
<uint64_t>(
397 "rbd_quiesce_notification_attempts");
399 notify_quiesce(async_request_id
, attempts
, prog_ctx
, on_finish
);
402 template <typename I
>
403 void ImageWatcher
<I
>::notify_quiesce(const AsyncRequestId
&async_request_id
,
404 size_t attempts
, ProgressContext
&prog_ctx
,
405 Context
*on_finish
) {
406 ldout(m_image_ctx
.cct
, 10) << this << " " << __func__
<< ": async_request_id="
407 << async_request_id
<< " attempts=" << attempts
410 ceph_assert(attempts
> 0);
411 auto notify_response
= new watcher::NotifyResponse();
412 auto on_notify
= new LambdaContext(
413 [notify_response
=std::unique_ptr
<watcher::NotifyResponse
>(notify_response
),
414 this, async_request_id
, &prog_ctx
, on_finish
, attempts
=attempts
-1](int r
) {
415 auto total_attempts
= m_image_ctx
.config
.template get_val
<uint64_t>(
416 "rbd_quiesce_notification_attempts");
417 if (total_attempts
< attempts
) {
418 total_attempts
= attempts
;
420 prog_ctx
.update_progress(total_attempts
- attempts
, total_attempts
);
422 if (r
== -ETIMEDOUT
) {
423 ldout(m_image_ctx
.cct
, 10) << this << " " << __func__
<< ": async_request_id="
424 << async_request_id
<< " timed out" << dendl
;
426 notify_quiesce(async_request_id
, attempts
, prog_ctx
, on_finish
);
430 for (auto &[client_id
, bl
] : notify_response
->acks
) {
431 if (bl
.length() == 0) {
435 auto iter
= bl
.cbegin();
437 ResponseMessage response_message
;
439 decode(response_message
, iter
);
441 if (response_message
.result
!= -EOPNOTSUPP
) {
442 r
= response_message
.result
;
444 } catch (const buffer::error
&err
) {
453 lderr(m_image_ctx
.cct
) << this << " failed to notify quiesce: "
454 << cpp_strerror(r
) << dendl
;
456 on_finish
->complete(r
);
460 encode(NotifyMessage(new QuiescePayload(async_request_id
)), bl
);
461 Watcher::send_notify(bl
, notify_response
, on_notify
);
464 template <typename I
>
465 void ImageWatcher
<I
>::notify_unquiesce(uint64_t request_id
, Context
*on_finish
) {
466 ldout(m_image_ctx
.cct
, 10) << this << " " << __func__
<< ": request_id="
467 << request_id
<< dendl
;
469 AsyncRequestId
async_request_id(get_client_id(), request_id
);
471 send_notify(new UnquiescePayload(async_request_id
), on_finish
);
474 template <typename I
>
475 void ImageWatcher
<I
>::notify_metadata_set(uint64_t request_id
,
476 const std::string
&key
,
477 const std::string
&value
,
478 Context
*on_finish
) {
479 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
480 ceph_assert(m_image_ctx
.exclusive_lock
&&
481 !m_image_ctx
.exclusive_lock
->is_lock_owner());
483 AsyncRequestId
async_request_id(get_client_id(), request_id
);
485 notify_async_request(
487 new MetadataUpdatePayload(async_request_id
, key
,
488 std::optional
<std::string
>{value
}),
489 m_no_op_prog_ctx
, on_finish
);
492 template <typename I
>
493 void ImageWatcher
<I
>::notify_metadata_remove(uint64_t request_id
,
494 const std::string
&key
,
495 Context
*on_finish
) {
496 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
497 ceph_assert(m_image_ctx
.exclusive_lock
&&
498 !m_image_ctx
.exclusive_lock
->is_lock_owner());
500 AsyncRequestId
async_request_id(get_client_id(), request_id
);
502 notify_async_request(
504 new MetadataUpdatePayload(async_request_id
, key
, std::nullopt
),
505 m_no_op_prog_ctx
, on_finish
);
508 template <typename I
>
509 void ImageWatcher
<I
>::schedule_cancel_async_requests() {
510 auto ctx
= new LambdaContext([this](int r
) {
511 if (r
!= -ECANCELED
) {
512 cancel_async_requests();
515 m_task_finisher
->queue(TASK_CODE_CANCEL_ASYNC_REQUESTS
, ctx
);
518 template <typename I
>
519 void ImageWatcher
<I
>::cancel_async_requests() {
520 std::unique_lock l
{m_async_request_lock
};
521 for (auto iter
= m_async_requests
.begin(); iter
!= m_async_requests
.end(); ) {
522 if (iter
->second
.second
== nullptr) {
523 // Quiesce notify request. Skip.
526 iter
->second
.first
->complete(-ERESTART
);
527 iter
= m_async_requests
.erase(iter
);
532 template <typename I
>
533 void ImageWatcher
<I
>::set_owner_client_id(const ClientId
& client_id
) {
534 ceph_assert(ceph_mutex_is_locked(m_owner_client_id_lock
));
535 m_owner_client_id
= client_id
;
536 ldout(m_image_ctx
.cct
, 10) << this << " current lock owner: "
537 << m_owner_client_id
<< dendl
;
540 template <typename I
>
541 ClientId ImageWatcher
<I
>::get_client_id() {
542 std::shared_lock l
{this->m_watch_lock
};
543 return ClientId(m_image_ctx
.md_ctx
.get_instance_id(), this->m_watch_handle
);
546 template <typename I
>
547 void ImageWatcher
<I
>::notify_acquired_lock() {
548 ldout(m_image_ctx
.cct
, 10) << this << " notify acquired lock" << dendl
;
550 ClientId client_id
= get_client_id();
552 std::lock_guard owner_client_id_locker
{m_owner_client_id_lock
};
553 set_owner_client_id(client_id
);
556 send_notify(new AcquiredLockPayload(client_id
));
559 template <typename I
>
560 void ImageWatcher
<I
>::notify_released_lock() {
561 ldout(m_image_ctx
.cct
, 10) << this << " notify released lock" << dendl
;
564 std::lock_guard owner_client_id_locker
{m_owner_client_id_lock
};
565 set_owner_client_id(ClientId());
568 send_notify(new ReleasedLockPayload(get_client_id()));
571 template <typename I
>
572 void ImageWatcher
<I
>::schedule_request_lock(bool use_timer
, int timer_delay
) {
573 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
575 // see notify_request_lock()
576 if (m_image_ctx
.exclusive_lock
== nullptr ||
577 m_image_ctx
.exclusive_lock
->is_lock_owner()) {
581 std::shared_lock watch_locker
{this->m_watch_lock
};
582 if (this->is_registered(this->m_watch_lock
)) {
583 ldout(m_image_ctx
.cct
, 15) << this << " requesting exclusive lock" << dendl
;
585 auto ctx
= new LambdaContext([this](int r
) {
586 if (r
!= -ECANCELED
) {
587 notify_request_lock();
592 if (timer_delay
< 0) {
593 timer_delay
= RETRY_DELAY_SECONDS
;
595 m_task_finisher
->add_event_after(TASK_CODE_REQUEST_LOCK
,
598 m_task_finisher
->queue(TASK_CODE_REQUEST_LOCK
, ctx
);
603 template <typename I
>
604 void ImageWatcher
<I
>::notify_request_lock() {
605 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
606 std::shared_lock image_locker
{m_image_ctx
.image_lock
};
608 // ExclusiveLock state machine can be dynamically disabled or
609 // race with task cancel
610 if (m_image_ctx
.exclusive_lock
== nullptr ||
611 m_image_ctx
.exclusive_lock
->is_lock_owner()) {
615 ldout(m_image_ctx
.cct
, 10) << this << " notify request lock" << dendl
;
617 notify_lock_owner(new RequestLockPayload(get_client_id(), false),
618 create_context_callback
<
619 ImageWatcher
, &ImageWatcher
<I
>::handle_request_lock
>(this));
622 template <typename I
>
623 void ImageWatcher
<I
>::handle_request_lock(int r
) {
624 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
625 std::shared_lock image_locker
{m_image_ctx
.image_lock
};
627 // ExclusiveLock state machine cannot transition -- but can be
628 // dynamically disabled
629 if (m_image_ctx
.exclusive_lock
== nullptr) {
633 if (r
== -ETIMEDOUT
) {
634 ldout(m_image_ctx
.cct
, 5) << this << " timed out requesting lock: retrying"
637 // treat this is a dead client -- so retest acquiring the lock
638 m_image_ctx
.exclusive_lock
->handle_peer_notification(0);
639 } else if (r
== -EROFS
) {
640 ldout(m_image_ctx
.cct
, 5) << this << " peer will not release lock" << dendl
;
641 m_image_ctx
.exclusive_lock
->handle_peer_notification(r
);
643 lderr(m_image_ctx
.cct
) << this << " error requesting lock: "
644 << cpp_strerror(r
) << dendl
;
645 schedule_request_lock(true);
647 // lock owner acked -- but resend if we don't see them release the lock
648 int retry_timeout
= m_image_ctx
.cct
->_conf
.template get_val
<int64_t>(
649 "client_notify_timeout");
650 ldout(m_image_ctx
.cct
, 15) << this << " will retry in " << retry_timeout
651 << " seconds" << dendl
;
652 schedule_request_lock(true, retry_timeout
);
656 template <typename I
>
657 void ImageWatcher
<I
>::notify_lock_owner(Payload
*payload
, Context
*on_finish
) {
658 ceph_assert(on_finish
!= nullptr);
659 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
662 encode(NotifyMessage(payload
), bl
);
664 NotifyLockOwner
*notify_lock_owner
= NotifyLockOwner::create(
665 m_image_ctx
, this->m_notifier
, std::move(bl
), on_finish
);
666 notify_lock_owner
->send();
669 template <typename I
>
670 bool ImageWatcher
<I
>::is_new_request(const AsyncRequestId
&id
) const {
671 ceph_assert(ceph_mutex_is_locked(m_async_request_lock
));
673 return m_async_pending
.count(id
) == 0 && m_async_complete
.count(id
) == 0;
676 template <typename I
>
677 bool ImageWatcher
<I
>::mark_async_request_complete(const AsyncRequestId
&id
,
679 ceph_assert(ceph_mutex_is_locked(m_async_request_lock
));
681 bool found
= m_async_pending
.erase(id
);
683 auto now
= ceph_clock_now();
685 auto it
= m_async_complete_expiration
.begin();
686 while (it
!= m_async_complete_expiration
.end() && it
->first
< now
) {
687 m_async_complete
.erase(it
->second
);
688 it
= m_async_complete_expiration
.erase(it
);
691 if (!m_async_complete
.insert({id
, r
}).second
) {
692 for (it
= m_async_complete_expiration
.begin();
693 it
!= m_async_complete_expiration
.end(); it
++) {
694 if (it
->second
== id
) {
695 m_async_complete_expiration
.erase(it
);
700 auto expiration_time
= now
;
701 expiration_time
+= 600;
702 m_async_complete_expiration
.insert({expiration_time
, id
});
707 template <typename I
>
708 Context
*ImageWatcher
<I
>::remove_async_request(const AsyncRequestId
&id
) {
709 std::unique_lock async_request_locker
{m_async_request_lock
};
711 return remove_async_request(id
, m_async_request_lock
);
714 template <typename I
>
715 Context
*ImageWatcher
<I
>::remove_async_request(const AsyncRequestId
&id
,
716 ceph::shared_mutex
&lock
) {
717 ceph_assert(ceph_mutex_is_locked(lock
));
719 ldout(m_image_ctx
.cct
, 20) << __func__
<< ": " << id
<< dendl
;
721 auto it
= m_async_requests
.find(id
);
722 if (it
!= m_async_requests
.end()) {
723 Context
*on_complete
= it
->second
.first
;
724 m_async_requests
.erase(it
);
730 template <typename I
>
731 void ImageWatcher
<I
>::schedule_async_request_timed_out(const AsyncRequestId
&id
) {
732 ldout(m_image_ctx
.cct
, 20) << "scheduling async request time out: " << id
735 auto ctx
= new LambdaContext([this, id
](int r
) {
736 if (r
!= -ECANCELED
) {
737 async_request_timed_out(id
);
741 Task
task(TASK_CODE_ASYNC_REQUEST
, id
);
742 m_task_finisher
->cancel(task
);
744 m_task_finisher
->add_event_after(
745 task
, m_image_ctx
.config
.template get_val
<uint64_t>("rbd_request_timed_out_seconds"),
749 template <typename I
>
750 void ImageWatcher
<I
>::async_request_timed_out(const AsyncRequestId
&id
) {
751 Context
*on_complete
= remove_async_request(id
);
752 if (on_complete
!= nullptr) {
753 ldout(m_image_ctx
.cct
, 5) << "async request timed out: " << id
<< dendl
;
754 m_image_ctx
.op_work_queue
->queue(on_complete
, -ETIMEDOUT
);
758 template <typename I
>
759 void ImageWatcher
<I
>::notify_async_request(
760 const AsyncRequestId
&async_request_id
, Payload
*payload
,
761 ProgressContext
& prog_ctx
, Context
*on_finish
) {
762 ceph_assert(on_finish
!= nullptr);
763 ceph_assert(ceph_mutex_is_locked(m_image_ctx
.owner_lock
));
765 ldout(m_image_ctx
.cct
, 10) << this << " async request: " << async_request_id
768 Context
*on_notify
= new LambdaContext([this, async_request_id
](int r
) {
770 // notification failed -- don't expect updates
771 Context
*on_complete
= remove_async_request(async_request_id
);
772 if (on_complete
!= nullptr) {
773 on_complete
->complete(r
);
778 Context
*on_complete
= new LambdaContext(
779 [this, async_request_id
, on_finish
](int r
) {
780 m_task_finisher
->cancel(Task(TASK_CODE_ASYNC_REQUEST
, async_request_id
));
781 on_finish
->complete(r
);
785 std::unique_lock async_request_locker
{m_async_request_lock
};
786 m_async_requests
[async_request_id
] = AsyncRequest(on_complete
, &prog_ctx
);
789 schedule_async_request_timed_out(async_request_id
);
790 notify_lock_owner(payload
, on_notify
);
793 template <typename I
>
794 int ImageWatcher
<I
>::prepare_async_request(const AsyncRequestId
& async_request_id
,
795 bool* new_request
, Context
** ctx
,
796 ProgressContext
** prog_ctx
) {
797 if (async_request_id
.client_id
== get_client_id()) {
800 std::unique_lock l
{m_async_request_lock
};
801 if (is_new_request(async_request_id
)) {
802 m_async_pending
.insert(async_request_id
);
804 *prog_ctx
= new RemoteProgressContext(*this, async_request_id
);
805 *ctx
= new RemoteContext(*this, async_request_id
, *prog_ctx
);
807 *new_request
= false;
808 auto it
= m_async_complete
.find(async_request_id
);
809 if (it
!= m_async_complete
.end()) {
811 // reset complete request expiration time
812 mark_async_request_complete(async_request_id
, r
);
820 template <typename I
>
821 Context
*ImageWatcher
<I
>::prepare_quiesce_request(
822 const AsyncRequestId
&request
, C_NotifyAck
*ack_ctx
) {
823 std::unique_lock locker
{m_async_request_lock
};
825 auto timeout
= 2 * watcher::Notifier::NOTIFY_TIMEOUT
/ 1000;
827 if (!is_new_request(request
)) {
828 auto it
= m_async_requests
.find(request
);
829 if (it
!= m_async_requests
.end()) {
830 delete it
->second
.first
;
831 it
->second
.first
= ack_ctx
;
833 auto it
= m_async_complete
.find(request
);
834 ceph_assert(it
!= m_async_complete
.end());
835 m_task_finisher
->queue(new C_ResponseMessage(ack_ctx
), it
->second
);
836 // reset complete request expiration time
837 mark_async_request_complete(request
, it
->second
);
841 m_task_finisher
->reschedule_event_after(Task(TASK_CODE_QUIESCE
, request
),
846 m_async_pending
.insert(request
);
847 m_async_requests
[request
] = AsyncRequest(ack_ctx
, nullptr);
848 m_async_op_tracker
.start_op();
850 return new LambdaContext(
851 [this, request
, timeout
](int r
) {
852 auto unquiesce_ctx
= new LambdaContext(
853 [this, request
](int r
) {
855 ldout(m_image_ctx
.cct
, 10) << this << " quiesce request "
856 << request
<< " timed out" << dendl
;
859 auto on_finish
= new LambdaContext(
861 m_async_op_tracker
.finish_op();
864 m_image_ctx
.state
->notify_unquiesce(on_finish
);
867 m_task_finisher
->add_event_after(Task(TASK_CODE_QUIESCE
, request
),
868 timeout
, unquiesce_ctx
);
870 std::unique_lock async_request_locker
{m_async_request_lock
};
871 mark_async_request_complete(request
, r
);
872 auto ctx
= remove_async_request(request
, m_async_request_lock
);
873 async_request_locker
.unlock();
874 if (ctx
!= nullptr) {
875 ctx
= new C_ResponseMessage(static_cast<C_NotifyAck
*>(ctx
));
878 m_task_finisher
->cancel(Task(TASK_CODE_QUIESCE
, request
));
883 template <typename I
>
884 void ImageWatcher
<I
>::prepare_unquiesce_request(const AsyncRequestId
&request
) {
886 std::unique_lock async_request_locker
{m_async_request_lock
};
887 auto it
= m_async_complete
.find(request
);
888 if (it
== m_async_complete
.end()) {
889 ldout(m_image_ctx
.cct
, 20) << this << " " << request
890 << ": not found in complete" << dendl
;
893 // reset complete request expiration time
894 mark_async_request_complete(request
, it
->second
);
897 bool canceled
= m_task_finisher
->cancel(Task(TASK_CODE_QUIESCE
, request
));
899 ldout(m_image_ctx
.cct
, 20) << this << " " << request
900 << ": timer task not found" << dendl
;
904 template <typename I
>
905 void ImageWatcher
<I
>::cancel_quiesce_requests() {
906 std::unique_lock l
{m_async_request_lock
};
907 for (auto it
= m_async_requests
.begin(); it
!= m_async_requests
.end(); ) {
908 if (it
->second
.second
== nullptr) {
909 // Quiesce notify request.
910 mark_async_request_complete(it
->first
, 0);
911 delete it
->second
.first
;
912 it
= m_async_requests
.erase(it
);
919 template <typename I
>
920 bool ImageWatcher
<I
>::handle_operation_request(
921 const AsyncRequestId
& async_request_id
,
922 exclusive_lock::OperationRequestType request_type
, Operation operation
,
923 std::function
<void(ProgressContext
&prog_ctx
, Context
*)> execute
,
924 C_NotifyAck
*ack_ctx
) {
925 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
927 if (m_image_ctx
.exclusive_lock
!= nullptr) {
929 if (m_image_ctx
.exclusive_lock
->accept_request(request_type
, &r
)) {
932 ProgressContext
*prog_ctx
;
934 if (async_request_id
) {
935 r
= prepare_async_request(async_request_id
, &new_request
, &ctx
,
937 encode(ResponseMessage(r
), ack_ctx
->out
);
941 ctx
= new C_ResponseMessage(ack_ctx
);
942 prog_ctx
= &m_no_op_prog_ctx
;
945 if (r
== 0 && new_request
) {
946 ctx
= new LambdaContext(
947 [this, operation
, ctx
](int r
) {
948 m_image_ctx
.operations
->finish_op(operation
, r
);
951 ctx
= new LambdaContext(
952 [this, execute
, prog_ctx
, ctx
](int r
) {
957 std::shared_lock l
{m_image_ctx
.owner_lock
};
958 execute(*prog_ctx
, ctx
);
960 m_image_ctx
.operations
->start_op(operation
, ctx
);
964 encode(ResponseMessage(r
), ack_ctx
->out
);
970 template <typename I
>
971 bool ImageWatcher
<I
>::handle_payload(const HeaderUpdatePayload
&payload
,
972 C_NotifyAck
*ack_ctx
) {
973 ldout(m_image_ctx
.cct
, 10) << this << " image header updated" << dendl
;
975 m_image_ctx
.state
->handle_update_notification();
976 m_image_ctx
.perfcounter
->inc(l_librbd_notify
);
977 if (ack_ctx
!= nullptr) {
978 m_image_ctx
.state
->flush_update_watchers(new C_ResponseMessage(ack_ctx
));
984 template <typename I
>
985 bool ImageWatcher
<I
>::handle_payload(const AcquiredLockPayload
&payload
,
986 C_NotifyAck
*ack_ctx
) {
987 ldout(m_image_ctx
.cct
, 10) << this << " image exclusively locked announcement"
990 bool cancel_async_requests
= true;
991 if (payload
.client_id
.is_valid()) {
992 std::lock_guard owner_client_id_locker
{m_owner_client_id_lock
};
993 if (payload
.client_id
== m_owner_client_id
) {
994 cancel_async_requests
= false;
996 set_owner_client_id(payload
.client_id
);
999 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
1000 if (m_image_ctx
.exclusive_lock
!= nullptr) {
1001 // potentially wake up the exclusive lock state machine now that
1002 // a lock owner has advertised itself
1003 m_image_ctx
.exclusive_lock
->handle_peer_notification(0);
1005 if (cancel_async_requests
&&
1006 (m_image_ctx
.exclusive_lock
== nullptr ||
1007 !m_image_ctx
.exclusive_lock
->is_lock_owner())) {
1008 schedule_cancel_async_requests();
1013 template <typename I
>
1014 bool ImageWatcher
<I
>::handle_payload(const ReleasedLockPayload
&payload
,
1015 C_NotifyAck
*ack_ctx
) {
1016 ldout(m_image_ctx
.cct
, 10) << this << " exclusive lock released" << dendl
;
1018 bool cancel_async_requests
= true;
1019 if (payload
.client_id
.is_valid()) {
1020 std::lock_guard l
{m_owner_client_id_lock
};
1021 if (payload
.client_id
!= m_owner_client_id
) {
1022 ldout(m_image_ctx
.cct
, 10) << this << " unexpected owner: "
1023 << payload
.client_id
<< " != "
1024 << m_owner_client_id
<< dendl
;
1025 cancel_async_requests
= false;
1027 set_owner_client_id(ClientId());
1031 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
1032 if (cancel_async_requests
&&
1033 (m_image_ctx
.exclusive_lock
== nullptr ||
1034 !m_image_ctx
.exclusive_lock
->is_lock_owner())) {
1035 schedule_cancel_async_requests();
1038 // alert the exclusive lock state machine that the lock is available
1039 if (m_image_ctx
.exclusive_lock
!= nullptr &&
1040 !m_image_ctx
.exclusive_lock
->is_lock_owner()) {
1041 m_task_finisher
->cancel(TASK_CODE_REQUEST_LOCK
);
1042 m_image_ctx
.exclusive_lock
->handle_peer_notification(0);
1047 template <typename I
>
1048 bool ImageWatcher
<I
>::handle_payload(const RequestLockPayload
&payload
,
1049 C_NotifyAck
*ack_ctx
) {
1050 ldout(m_image_ctx
.cct
, 10) << this << " exclusive lock requested" << dendl
;
1051 if (payload
.client_id
== get_client_id()) {
1055 std::shared_lock l
{m_image_ctx
.owner_lock
};
1056 if (m_image_ctx
.exclusive_lock
!= nullptr &&
1057 m_image_ctx
.exclusive_lock
->is_lock_owner()) {
1059 bool accept_request
= m_image_ctx
.exclusive_lock
->accept_request(
1060 exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
, &r
);
1062 if (accept_request
) {
1063 ceph_assert(r
== 0);
1064 std::lock_guard owner_client_id_locker
{m_owner_client_id_lock
};
1065 if (!m_owner_client_id
.is_valid()) {
1069 ldout(m_image_ctx
.cct
, 10) << this << " queuing release of exclusive lock"
1071 r
= m_image_ctx
.get_exclusive_lock_policy()->lock_requested(
1074 encode(ResponseMessage(r
), ack_ctx
->out
);
1079 template <typename I
>
1080 bool ImageWatcher
<I
>::handle_payload(const AsyncProgressPayload
&payload
,
1081 C_NotifyAck
*ack_ctx
) {
1082 std::shared_lock l
{m_async_request_lock
};
1083 std::map
<AsyncRequestId
, AsyncRequest
>::iterator req_it
=
1084 m_async_requests
.find(payload
.async_request_id
);
1085 if (req_it
!= m_async_requests
.end()) {
1086 ldout(m_image_ctx
.cct
, 20) << this << " request progress: "
1087 << payload
.async_request_id
<< " @ "
1088 << payload
.offset
<< "/" << payload
.total
1090 schedule_async_request_timed_out(payload
.async_request_id
);
1091 req_it
->second
.second
->update_progress(payload
.offset
, payload
.total
);
1096 template <typename I
>
1097 bool ImageWatcher
<I
>::handle_payload(const AsyncCompletePayload
&payload
,
1098 C_NotifyAck
*ack_ctx
) {
1099 Context
*on_complete
= remove_async_request(payload
.async_request_id
);
1100 if (on_complete
!= nullptr) {
1101 ldout(m_image_ctx
.cct
, 10) << this << " request finished: "
1102 << payload
.async_request_id
<< "="
1103 << payload
.result
<< dendl
;
1104 on_complete
->complete(payload
.result
);
1109 template <typename I
>
1110 bool ImageWatcher
<I
>::handle_payload(const FlattenPayload
&payload
,
1111 C_NotifyAck
*ack_ctx
) {
1112 ldout(m_image_ctx
.cct
, 10) << this << " remote flatten request: "
1113 << payload
.async_request_id
<< dendl
;
1115 return handle_operation_request(
1116 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1117 OPERATION_FLATTEN
, std::bind(&Operations
<I
>::execute_flatten
,
1118 m_image_ctx
.operations
,
1119 std::placeholders::_1
,
1120 std::placeholders::_2
),
1124 template <typename I
>
1125 bool ImageWatcher
<I
>::handle_payload(const ResizePayload
&payload
,
1126 C_NotifyAck
*ack_ctx
) {
1127 ldout(m_image_ctx
.cct
, 10) << this << " remote resize request: "
1128 << payload
.async_request_id
<< " "
1129 << payload
.size
<< " "
1130 << payload
.allow_shrink
<< dendl
;
1132 return handle_operation_request(
1133 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1134 OPERATION_RESIZE
, std::bind(&Operations
<I
>::execute_resize
,
1135 m_image_ctx
.operations
, payload
.size
,
1136 payload
.allow_shrink
, std::placeholders::_1
,
1137 std::placeholders::_2
, 0), ack_ctx
);
1140 template <typename I
>
1141 bool ImageWatcher
<I
>::handle_payload(const SnapCreatePayload
&payload
,
1142 C_NotifyAck
*ack_ctx
) {
1143 ldout(m_image_ctx
.cct
, 10) << this << " remote snap_create request: "
1144 << payload
.async_request_id
<< " "
1145 << payload
.snap_namespace
<< " "
1146 << payload
.snap_name
<< " "
1147 << payload
.flags
<< dendl
;
1149 auto request_type
= exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
;
1151 // rbd-mirror needs to accept forced promotion orphan snap create requests
1152 auto mirror_ns
= std::get_if
<cls::rbd::MirrorSnapshotNamespace
>(
1153 &payload
.snap_namespace
);
1154 if (mirror_ns
!= nullptr && mirror_ns
->is_orphan()) {
1155 request_type
= exclusive_lock::OPERATION_REQUEST_TYPE_FORCE_PROMOTION
;
1158 return handle_operation_request(
1159 payload
.async_request_id
, request_type
,
1160 OPERATION_SNAP_CREATE
, std::bind(&Operations
<I
>::execute_snap_create
,
1161 m_image_ctx
.operations
,
1162 payload
.snap_namespace
,
1163 payload
.snap_name
, std::placeholders::_2
,
1164 0, payload
.flags
, std::placeholders::_1
),
1168 template <typename I
>
1169 bool ImageWatcher
<I
>::handle_payload(const SnapRenamePayload
&payload
,
1170 C_NotifyAck
*ack_ctx
) {
1171 ldout(m_image_ctx
.cct
, 10) << this << " remote snap_rename request: "
1172 << payload
.async_request_id
<< " "
1173 << payload
.snap_id
<< " to "
1174 << payload
.snap_name
<< dendl
;
1176 return handle_operation_request(
1177 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1178 OPERATION_SNAP_RENAME
, std::bind(&Operations
<I
>::execute_snap_rename
,
1179 m_image_ctx
.operations
, payload
.snap_id
,
1181 std::placeholders::_2
), ack_ctx
);
1184 template <typename I
>
1185 bool ImageWatcher
<I
>::handle_payload(const SnapRemovePayload
&payload
,
1186 C_NotifyAck
*ack_ctx
) {
1187 ldout(m_image_ctx
.cct
, 10) << this << " remote snap_remove request: "
1188 << payload
.snap_name
<< dendl
;
1190 auto request_type
= exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
;
1191 if (cls::rbd::get_snap_namespace_type(payload
.snap_namespace
) ==
1192 cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH
) {
1193 request_type
= exclusive_lock::OPERATION_REQUEST_TYPE_TRASH_SNAP_REMOVE
;
1196 return handle_operation_request(
1197 payload
.async_request_id
, request_type
, OPERATION_SNAP_REMOVE
,
1198 std::bind(&Operations
<I
>::execute_snap_remove
, m_image_ctx
.operations
,
1199 payload
.snap_namespace
, payload
.snap_name
,
1200 std::placeholders::_2
), ack_ctx
);
1203 template <typename I
>
1204 bool ImageWatcher
<I
>::handle_payload(const SnapProtectPayload
& payload
,
1205 C_NotifyAck
*ack_ctx
) {
1206 ldout(m_image_ctx
.cct
, 10) << this << " remote snap_protect request: "
1207 << payload
.async_request_id
<< " "
1208 << payload
.snap_name
<< dendl
;
1210 return handle_operation_request(
1211 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1212 OPERATION_SNAP_PROTECT
, std::bind(&Operations
<I
>::execute_snap_protect
,
1213 m_image_ctx
.operations
,
1214 payload
.snap_namespace
,
1216 std::placeholders::_2
), ack_ctx
);
1219 template <typename I
>
1220 bool ImageWatcher
<I
>::handle_payload(const SnapUnprotectPayload
& payload
,
1221 C_NotifyAck
*ack_ctx
) {
1222 ldout(m_image_ctx
.cct
, 10) << this << " remote snap_unprotect request: "
1223 << payload
.async_request_id
<< " "
1224 << payload
.snap_name
<< dendl
;
1226 return handle_operation_request(
1227 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1228 OPERATION_SNAP_UNPROTECT
, std::bind(&Operations
<I
>::execute_snap_unprotect
,
1229 m_image_ctx
.operations
,
1230 payload
.snap_namespace
,
1232 std::placeholders::_2
), ack_ctx
);
1235 template <typename I
>
1236 bool ImageWatcher
<I
>::handle_payload(const RebuildObjectMapPayload
& payload
,
1237 C_NotifyAck
*ack_ctx
) {
1238 ldout(m_image_ctx
.cct
, 10) << this << " remote rebuild object map request: "
1239 << payload
.async_request_id
<< dendl
;
1241 return handle_operation_request(
1242 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1243 OPERATION_REBUILD_OBJECT_MAP
,
1244 std::bind(&Operations
<I
>::execute_rebuild_object_map
,
1245 m_image_ctx
.operations
, std::placeholders::_1
,
1246 std::placeholders::_2
), ack_ctx
);
1249 template <typename I
>
1250 bool ImageWatcher
<I
>::handle_payload(const RenamePayload
& payload
,
1251 C_NotifyAck
*ack_ctx
) {
1252 ldout(m_image_ctx
.cct
, 10) << this << " remote rename request: "
1253 << payload
.async_request_id
<< " "
1254 << payload
.image_name
<< dendl
;
1256 return handle_operation_request(
1257 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1258 OPERATION_RENAME
, std::bind(&Operations
<I
>::execute_rename
,
1259 m_image_ctx
.operations
, payload
.image_name
,
1260 std::placeholders::_2
), ack_ctx
);
1263 template <typename I
>
1264 bool ImageWatcher
<I
>::handle_payload(const UpdateFeaturesPayload
& payload
,
1265 C_NotifyAck
*ack_ctx
) {
1266 ldout(m_image_ctx
.cct
, 10) << this << " remote update_features request: "
1267 << payload
.async_request_id
<< " "
1268 << payload
.features
<< " "
1269 << (payload
.enabled
? "enabled" : "disabled")
1272 return handle_operation_request(
1273 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1274 OPERATION_UPDATE_FEATURES
,
1275 std::bind(&Operations
<I
>::execute_update_features
, m_image_ctx
.operations
,
1276 payload
.features
, payload
.enabled
, std::placeholders::_2
, 0),
1280 template <typename I
>
1281 bool ImageWatcher
<I
>::handle_payload(const MigratePayload
&payload
,
1282 C_NotifyAck
*ack_ctx
) {
1283 ldout(m_image_ctx
.cct
, 10) << this << " remote migrate request: "
1284 << payload
.async_request_id
<< dendl
;
1286 return handle_operation_request(
1287 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1288 OPERATION_MIGRATE
, std::bind(&Operations
<I
>::execute_migrate
,
1289 m_image_ctx
.operations
,
1290 std::placeholders::_1
,
1291 std::placeholders::_2
), ack_ctx
);
1294 template <typename I
>
1295 bool ImageWatcher
<I
>::handle_payload(const SparsifyPayload
&payload
,
1296 C_NotifyAck
*ack_ctx
) {
1297 ldout(m_image_ctx
.cct
, 10) << this << " remote sparsify request: "
1298 << payload
.async_request_id
<< dendl
;
1300 return handle_operation_request(
1301 payload
.async_request_id
, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1302 OPERATION_SPARSIFY
, std::bind(&Operations
<I
>::execute_sparsify
,
1303 m_image_ctx
.operations
,
1304 payload
.sparse_size
, std::placeholders::_1
,
1305 std::placeholders::_2
), ack_ctx
);
1308 template <typename I
>
1309 bool ImageWatcher
<I
>::handle_payload(const MetadataUpdatePayload
&payload
,
1310 C_NotifyAck
*ack_ctx
) {
1311 if (payload
.value
) {
1312 ldout(m_image_ctx
.cct
, 10) << this << " remote metadata_set request: "
1313 << payload
.async_request_id
<< " "
1314 << "key=" << payload
.key
<< ", value="
1315 << *payload
.value
<< dendl
;
1317 return handle_operation_request(
1318 payload
.async_request_id
,
1319 exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1320 OPERATION_METADATA_UPDATE
,
1321 std::bind(&Operations
<I
>::execute_metadata_set
,
1322 m_image_ctx
.operations
, payload
.key
, *payload
.value
,
1323 std::placeholders::_2
),
1326 ldout(m_image_ctx
.cct
, 10) << this << " remote metadata_remove request: "
1327 << payload
.async_request_id
<< " "
1328 << "key=" << payload
.key
<< dendl
;
1330 return handle_operation_request(
1331 payload
.async_request_id
,
1332 exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
,
1333 OPERATION_METADATA_UPDATE
,
1334 std::bind(&Operations
<I
>::execute_metadata_remove
,
1335 m_image_ctx
.operations
, payload
.key
, std::placeholders::_2
),
1340 template <typename I
>
1341 bool ImageWatcher
<I
>::handle_payload(const QuiescePayload
&payload
,
1342 C_NotifyAck
*ack_ctx
) {
1343 auto on_finish
= prepare_quiesce_request(payload
.async_request_id
, ack_ctx
);
1344 if (on_finish
== nullptr) {
1345 ldout(m_image_ctx
.cct
, 10) << this << " duplicate quiesce request: "
1346 << payload
.async_request_id
<< dendl
;
1350 ldout(m_image_ctx
.cct
, 10) << this << " quiesce request: "
1351 << payload
.async_request_id
<< dendl
;
1352 m_image_ctx
.state
->notify_quiesce(on_finish
);
1356 template <typename I
>
1357 bool ImageWatcher
<I
>::handle_payload(const UnquiescePayload
&payload
,
1358 C_NotifyAck
*ack_ctx
) {
1359 ldout(m_image_ctx
.cct
, 10) << this << " unquiesce request: "
1360 << payload
.async_request_id
<< dendl
;
1362 prepare_unquiesce_request(payload
.async_request_id
);
1366 template <typename I
>
1367 bool ImageWatcher
<I
>::handle_payload(const UnknownPayload
&payload
,
1368 C_NotifyAck
*ack_ctx
) {
1369 std::shared_lock l
{m_image_ctx
.owner_lock
};
1370 if (m_image_ctx
.exclusive_lock
!= nullptr) {
1372 if (m_image_ctx
.exclusive_lock
->accept_request(
1373 exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL
, &r
) || r
< 0) {
1374 encode(ResponseMessage(-EOPNOTSUPP
), ack_ctx
->out
);
1380 template <typename I
>
1381 void ImageWatcher
<I
>::process_payload(uint64_t notify_id
, uint64_t handle
,
1383 auto ctx
= new Watcher::C_NotifyAck(this, notify_id
, handle
);
1386 switch (payload
->get_notify_op()) {
1387 case NOTIFY_OP_ACQUIRED_LOCK
:
1388 complete
= handle_payload(*(static_cast<AcquiredLockPayload
*>(payload
)),
1391 case NOTIFY_OP_RELEASED_LOCK
:
1392 complete
= handle_payload(*(static_cast<ReleasedLockPayload
*>(payload
)),
1395 case NOTIFY_OP_REQUEST_LOCK
:
1396 complete
= handle_payload(*(static_cast<RequestLockPayload
*>(payload
)),
1399 case NOTIFY_OP_HEADER_UPDATE
:
1400 complete
= handle_payload(*(static_cast<HeaderUpdatePayload
*>(payload
)),
1403 case NOTIFY_OP_ASYNC_PROGRESS
:
1404 complete
= handle_payload(*(static_cast<AsyncProgressPayload
*>(payload
)),
1407 case NOTIFY_OP_ASYNC_COMPLETE
:
1408 complete
= handle_payload(*(static_cast<AsyncCompletePayload
*>(payload
)),
1411 case NOTIFY_OP_FLATTEN
:
1412 complete
= handle_payload(*(static_cast<FlattenPayload
*>(payload
)), ctx
);
1414 case NOTIFY_OP_RESIZE
:
1415 complete
= handle_payload(*(static_cast<ResizePayload
*>(payload
)), ctx
);
1417 case NOTIFY_OP_SNAP_CREATE
:
1418 complete
= handle_payload(*(static_cast<SnapCreatePayload
*>(payload
)),
1421 case NOTIFY_OP_SNAP_REMOVE
:
1422 complete
= handle_payload(*(static_cast<SnapRemovePayload
*>(payload
)),
1425 case NOTIFY_OP_SNAP_RENAME
:
1426 complete
= handle_payload(*(static_cast<SnapRenamePayload
*>(payload
)),
1429 case NOTIFY_OP_SNAP_PROTECT
:
1430 complete
= handle_payload(*(static_cast<SnapProtectPayload
*>(payload
)),
1433 case NOTIFY_OP_SNAP_UNPROTECT
:
1434 complete
= handle_payload(*(static_cast<SnapUnprotectPayload
*>(payload
)),
1437 case NOTIFY_OP_REBUILD_OBJECT_MAP
:
1438 complete
= handle_payload(*(static_cast<RebuildObjectMapPayload
*>(payload
)),
1441 case NOTIFY_OP_RENAME
:
1442 complete
= handle_payload(*(static_cast<RenamePayload
*>(payload
)), ctx
);
1444 case NOTIFY_OP_UPDATE_FEATURES
:
1445 complete
= handle_payload(*(static_cast<UpdateFeaturesPayload
*>(payload
)),
1448 case NOTIFY_OP_MIGRATE
:
1449 complete
= handle_payload(*(static_cast<MigratePayload
*>(payload
)), ctx
);
1451 case NOTIFY_OP_SPARSIFY
:
1452 complete
= handle_payload(*(static_cast<SparsifyPayload
*>(payload
)), ctx
);
1454 case NOTIFY_OP_QUIESCE
:
1455 complete
= handle_payload(*(static_cast<QuiescePayload
*>(payload
)), ctx
);
1457 case NOTIFY_OP_UNQUIESCE
:
1458 complete
= handle_payload(*(static_cast<UnquiescePayload
*>(payload
)), ctx
);
1460 case NOTIFY_OP_METADATA_UPDATE
:
1461 complete
= handle_payload(*(static_cast<MetadataUpdatePayload
*>(payload
)), ctx
);
1464 ceph_assert(payload
->get_notify_op() == static_cast<NotifyOp
>(-1));
1465 complete
= handle_payload(*(static_cast<UnknownPayload
*>(payload
)), ctx
);
1473 template <typename I
>
1474 void ImageWatcher
<I
>::handle_notify(uint64_t notify_id
, uint64_t handle
,
1475 uint64_t notifier_id
, bufferlist
&bl
) {
1476 NotifyMessage notify_message
;
1477 if (bl
.length() == 0) {
1478 // legacy notification for header updates
1479 notify_message
= NotifyMessage(new HeaderUpdatePayload());
1482 auto iter
= bl
.cbegin();
1483 decode(notify_message
, iter
);
1484 } catch (const buffer::error
&err
) {
1485 lderr(m_image_ctx
.cct
) << this << " error decoding image notification: "
1486 << err
.what() << dendl
;
1491 // if an image refresh is required, refresh before processing the request
1492 if (notify_message
.check_for_refresh() &&
1493 m_image_ctx
.state
->is_refresh_required()) {
1495 m_image_ctx
.state
->refresh(
1496 new C_ProcessPayload(this, notify_id
, handle
,
1497 std::move(notify_message
.payload
)));
1499 process_payload(notify_id
, handle
, notify_message
.payload
.get());
1503 template <typename I
>
1504 void ImageWatcher
<I
>::handle_error(uint64_t handle
, int err
) {
1505 lderr(m_image_ctx
.cct
) << this << " image watch failed: " << handle
<< ", "
1506 << cpp_strerror(err
) << dendl
;
1509 std::lock_guard l
{m_owner_client_id_lock
};
1510 set_owner_client_id(ClientId());
1513 Watcher::handle_error(handle
, err
);
1516 template <typename I
>
1517 void ImageWatcher
<I
>::handle_rewatch_complete(int r
) {
1518 CephContext
*cct
= m_image_ctx
.cct
;
1519 ldout(cct
, 10) << this << " " << __func__
<< ": r=" << r
<< dendl
;
1522 std::shared_lock owner_locker
{m_image_ctx
.owner_lock
};
1523 if (m_image_ctx
.exclusive_lock
!= nullptr) {
1524 // update the lock cookie with the new watch handle
1525 m_image_ctx
.exclusive_lock
->reacquire_lock(nullptr);
1529 // image might have been updated while we didn't have active watch
1530 handle_payload(HeaderUpdatePayload(), nullptr);
1533 template <typename I
>
1534 void ImageWatcher
<I
>::send_notify(Payload
*payload
, Context
*ctx
) {
1537 encode(NotifyMessage(payload
), bl
);
1538 Watcher::send_notify(bl
, nullptr, ctx
);
1541 template <typename I
>
1542 void ImageWatcher
<I
>::RemoteContext::finish(int r
) {
1543 m_image_watcher
.schedule_async_complete(m_async_request_id
, r
);
1546 template <typename I
>
1547 void ImageWatcher
<I
>::C_ResponseMessage::finish(int r
) {
1548 CephContext
*cct
= notify_ack
->cct
;
1549 ldout(cct
, 10) << this << " C_ResponseMessage: r=" << r
<< dendl
;
1551 encode(ResponseMessage(r
), notify_ack
->out
);
1552 notify_ack
->complete(0);
1555 } // namespace librbd
1557 template class librbd::ImageWatcher
<librbd::ImageCtx
>;