1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
5 #define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
13 #include "common/AsyncOpTracker.h"
14 #include "librbd/Watcher.h"
15 #include "librbd/managed_lock/Types.h"
16 #include "tools/rbd_mirror/instance_watcher/Types.h"
22 template <typename
> class ManagedLock
;
29 template <typename
> class InstanceReplayer
;
30 template <typename
> class Throttler
;
31 template <typename
> struct Threads
;
33 template <typename ImageCtxT
= librbd::ImageCtx
>
34 class InstanceWatcher
: protected librbd::Watcher
{
35 using librbd::Watcher::unregister_watch
; // Silence overloaded virtual warning
37 static void get_instances(librados::IoCtx
&io_ctx
,
38 std::vector
<std::string
> *instance_ids
,
40 static void remove_instance(librados::IoCtx
&io_ctx
,
41 librbd::AsioEngine
& asio_engine
,
42 const std::string
&instance_id
,
45 static InstanceWatcher
*create(
46 librados::IoCtx
&io_ctx
, librbd::AsioEngine
& asio_engine
,
47 InstanceReplayer
<ImageCtxT
> *instance_replayer
,
48 Throttler
<ImageCtxT
> *image_sync_throttler
);
53 InstanceWatcher(librados::IoCtx
&io_ctx
, librbd::AsioEngine
& asio_engine
,
54 InstanceReplayer
<ImageCtxT
> *instance_replayer
,
55 Throttler
<ImageCtxT
> *image_sync_throttler
,
56 const std::string
&instance_id
);
57 ~InstanceWatcher() override
;
59 inline std::string
&get_instance_id() {
66 void init(Context
*on_finish
);
67 void shut_down(Context
*on_finish
);
68 void remove(Context
*on_finish
);
70 void notify_image_acquire(const std::string
&instance_id
,
71 const std::string
&global_image_id
,
72 Context
*on_notify_ack
);
73 void notify_image_release(const std::string
&instance_id
,
74 const std::string
&global_image_id
,
75 Context
*on_notify_ack
);
76 void notify_peer_image_removed(const std::string
&instance_id
,
77 const std::string
&global_image_id
,
78 const std::string
&peer_mirror_uuid
,
79 Context
*on_notify_ack
);
81 void notify_sync_request(const std::string
&sync_id
, Context
*on_sync_start
);
82 bool cancel_sync_request(const std::string
&sync_id
);
83 void notify_sync_complete(const std::string
&sync_id
);
85 void cancel_notify_requests(const std::string
&instance_id
);
87 void handle_acquire_leader();
88 void handle_release_leader();
89 void handle_update_leader(const std::string
&leader_instance_id
);
95 * BREAK_INSTANCE_LOCK -------\
98 * GET_INSTANCE_LOCKER * * *>|
101 * <uninitialized> <----------------+---- WAIT_FOR_NOTIFY_OPS
104 * REGISTER_INSTANCE * * * * * *|* *> UNREGISTER_INSTANCE
107 * CREATE_INSTANCE_OBJECT * * * * * *> REMOVE_INSTANCE_OBJECT
110 * REGISTER_WATCH * * * * * * * * * *> UNREGISTER_WATCH
113 * ACQUIRE_LOCK * * * * * * * * * * * RELEASE_LOCK
116 * <watching> -------------------------------/
121 struct C_NotifyInstanceRequest
;
122 struct C_SyncRequest
;
124 typedef std::pair
<std::string
, std::string
> Id
;
126 struct HandlePayloadVisitor
: public boost::static_visitor
<void> {
127 InstanceWatcher
*instance_watcher
;
128 std::string instance_id
;
129 C_NotifyAck
*on_notify_ack
;
131 HandlePayloadVisitor(InstanceWatcher
*instance_watcher
,
132 const std::string
&instance_id
,
133 C_NotifyAck
*on_notify_ack
)
134 : instance_watcher(instance_watcher
), instance_id(instance_id
),
135 on_notify_ack(on_notify_ack
) {
138 template <typename Payload
>
139 inline void operator()(const Payload
&payload
) const {
140 instance_watcher
->handle_payload(instance_id
, payload
, on_notify_ack
);
145 std::string instance_id
;
147 C_NotifyAck
*on_notify_ack
= nullptr;
149 Request(const std::string
&instance_id
, uint64_t request_id
)
150 : instance_id(instance_id
), request_id(request_id
) {
153 inline bool operator<(const Request
&rhs
) const {
154 return instance_id
< rhs
.instance_id
||
155 (instance_id
== rhs
.instance_id
&& request_id
< rhs
.request_id
);
159 Threads
<ImageCtxT
> *m_threads
;
160 InstanceReplayer
<ImageCtxT
> *m_instance_replayer
;
161 Throttler
<ImageCtxT
> *m_image_sync_throttler
;
162 std::string m_instance_id
;
164 mutable ceph::mutex m_lock
;
165 librbd::ManagedLock
<ImageCtxT
> *m_instance_lock
;
166 Context
*m_on_finish
= nullptr;
168 std::string m_leader_instance_id
;
169 librbd::managed_lock::Locker m_instance_locker
;
170 std::set
<std::pair
<std::string
, C_NotifyInstanceRequest
*>> m_notify_ops
;
171 AsyncOpTracker m_notify_op_tracker
;
172 uint64_t m_request_seq
= 0;
173 std::set
<Request
> m_requests
;
174 std::set
<C_NotifyInstanceRequest
*> m_suspended_ops
;
175 std::map
<std::string
, C_SyncRequest
*> m_inflight_sync_reqs
;
177 inline bool is_leader() const {
178 return m_leader_instance_id
== m_instance_id
;
181 void register_instance();
182 void handle_register_instance(int r
);
184 void create_instance_object();
185 void handle_create_instance_object(int r
);
187 void register_watch();
188 void handle_register_watch(int r
);
191 void handle_acquire_lock(int r
);
194 void handle_release_lock(int r
);
196 void unregister_watch();
197 void handle_unregister_watch(int r
);
199 void remove_instance_object();
200 void handle_remove_instance_object(int r
);
202 void unregister_instance();
203 void handle_unregister_instance(int r
);
205 void wait_for_notify_ops();
206 void handle_wait_for_notify_ops(int r
);
208 void get_instance_locker();
209 void handle_get_instance_locker(int r
);
211 void break_instance_lock();
212 void handle_break_instance_lock(int r
);
214 void suspend_notify_request(C_NotifyInstanceRequest
*req
);
215 bool unsuspend_notify_request(C_NotifyInstanceRequest
*req
);
216 void unsuspend_notify_requests();
218 void notify_sync_complete(const ceph::mutex
& lock
, const std::string
&sync_id
);
219 void handle_notify_sync_request(C_SyncRequest
*sync_ctx
, int r
);
220 void handle_notify_sync_complete(C_SyncRequest
*sync_ctx
, int r
);
222 void notify_sync_start(const std::string
&instance_id
,
223 const std::string
&sync_id
);
225 Context
*prepare_request(const std::string
&instance_id
, uint64_t request_id
,
226 C_NotifyAck
*on_notify_ack
);
227 void complete_request(const std::string
&instance_id
, uint64_t request_id
,
230 void handle_notify(uint64_t notify_id
, uint64_t handle
,
231 uint64_t notifier_id
, bufferlist
&bl
) override
;
233 void handle_image_acquire(const std::string
&global_image_id
,
235 void handle_image_release(const std::string
&global_image_id
,
237 void handle_peer_image_removed(const std::string
&global_image_id
,
238 const std::string
&peer_mirror_uuid
,
241 void handle_sync_request(const std::string
&instance_id
,
242 const std::string
&sync_id
, Context
*on_finish
);
243 void handle_sync_start(const std::string
&instance_id
,
244 const std::string
&sync_id
, Context
*on_finish
);
246 void handle_payload(const std::string
&instance_id
,
247 const instance_watcher::ImageAcquirePayload
&payload
,
248 C_NotifyAck
*on_notify_ack
);
249 void handle_payload(const std::string
&instance_id
,
250 const instance_watcher::ImageReleasePayload
&payload
,
251 C_NotifyAck
*on_notify_ack
);
252 void handle_payload(const std::string
&instance_id
,
253 const instance_watcher::PeerImageRemovedPayload
&payload
,
254 C_NotifyAck
*on_notify_ack
);
255 void handle_payload(const std::string
&instance_id
,
256 const instance_watcher::SyncRequestPayload
&payload
,
257 C_NotifyAck
*on_notify_ack
);
258 void handle_payload(const std::string
&instance_id
,
259 const instance_watcher::SyncStartPayload
&payload
,
260 C_NotifyAck
*on_notify_ack
);
261 void handle_payload(const std::string
&instance_id
,
262 const instance_watcher::UnknownPayload
&payload
,
263 C_NotifyAck
*on_notify_ack
);
266 } // namespace mirror
269 #endif // CEPH_RBD_MIRROR_INSTANCE_WATCHER_H