]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "InstanceWatcher.h" | |
7c673cae FG |
5 | #include "include/stringify.h" |
6 | #include "common/debug.h" | |
7 | #include "common/errno.h" | |
8 | #include "cls/rbd/cls_rbd_client.h" | |
9 | #include "librbd/ManagedLock.h" | |
10 | #include "librbd/Utils.h" | |
11 | #include "InstanceReplayer.h" | |
9f95a23c | 12 | #include "Throttler.h" |
11fdf7f2 | 13 | #include "common/Cond.h" |
7c673cae FG |
14 | |
15 | #define dout_context g_ceph_context | |
16 | #define dout_subsys ceph_subsys_rbd_mirror | |
17 | #undef dout_prefix | |
18 | #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " | |
19 | ||
20 | namespace rbd { | |
21 | namespace mirror { | |
22 | ||
23 | using namespace instance_watcher; | |
24 | ||
25 | using librbd::util::create_async_context_callback; | |
26 | using librbd::util::create_context_callback; | |
27 | using librbd::util::create_rados_callback; | |
28 | using librbd::util::unique_lock_name; | |
29 | ||
30 | namespace { | |
31 | ||
32 | struct C_GetInstances : public Context { | |
33 | std::vector<std::string> *instance_ids; | |
34 | Context *on_finish; | |
35 | bufferlist out_bl; | |
36 | ||
37 | C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish) | |
38 | : instance_ids(instance_ids), on_finish(on_finish) { | |
39 | } | |
40 | ||
41 | void finish(int r) override { | |
11fdf7f2 | 42 | dout(10) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r |
7c673cae FG |
43 | << dendl; |
44 | ||
45 | if (r == 0) { | |
11fdf7f2 | 46 | auto it = out_bl.cbegin(); |
7c673cae FG |
47 | r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids); |
48 | } else if (r == -ENOENT) { | |
49 | r = 0; | |
50 | } | |
51 | on_finish->complete(r); | |
52 | } | |
53 | }; | |
54 | ||
55 | template <typename I> | |
56 | struct C_RemoveInstanceRequest : public Context { | |
57 | InstanceWatcher<I> instance_watcher; | |
58 | Context *on_finish; | |
59 | ||
60 | C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue, | |
61 | const std::string &instance_id, Context *on_finish) | |
9f95a23c | 62 | : instance_watcher(io_ctx, work_queue, nullptr, nullptr, instance_id), |
7c673cae FG |
63 | on_finish(on_finish) { |
64 | } | |
65 | ||
66 | void send() { | |
11fdf7f2 | 67 | dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl; |
7c673cae FG |
68 | |
69 | instance_watcher.remove(this); | |
70 | } | |
71 | ||
72 | void finish(int r) override { | |
11fdf7f2 | 73 | dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r=" |
7c673cae | 74 | << r << dendl; |
11fdf7f2 | 75 | ceph_assert(r == 0); |
7c673cae FG |
76 | |
77 | on_finish->complete(r); | |
78 | } | |
79 | }; | |
80 | ||
81 | } // anonymous namespace | |
82 | ||
83 | template <typename I> | |
84 | struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context { | |
85 | InstanceWatcher<I> *instance_watcher; | |
7c673cae FG |
86 | std::string instance_id; |
87 | uint64_t request_id; | |
88 | bufferlist bl; | |
89 | Context *on_finish; | |
31f18b77 FG |
90 | bool send_to_leader; |
91 | std::unique_ptr<librbd::watcher::Notifier> notifier; | |
7c673cae | 92 | librbd::watcher::NotifyResponse response; |
31f18b77 | 93 | bool canceling = false; |
7c673cae FG |
94 | |
95 | C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher, | |
96 | const std::string &instance_id, uint64_t request_id, | |
97 | bufferlist &&bl, Context *on_finish) | |
31f18b77 FG |
98 | : instance_watcher(instance_watcher), instance_id(instance_id), |
99 | request_id(request_id), bl(bl), on_finish(on_finish), | |
100 | send_to_leader(instance_id.empty()) { | |
11fdf7f2 | 101 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
31f18b77 FG |
102 | << ": instance_watcher=" << instance_watcher << ", instance_id=" |
103 | << instance_id << ", request_id=" << request_id << dendl; | |
104 | ||
9f95a23c | 105 | ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); |
31f18b77 FG |
106 | |
107 | if (!send_to_leader) { | |
11fdf7f2 | 108 | ceph_assert((!instance_id.empty())); |
31f18b77 FG |
109 | notifier.reset(new librbd::watcher::Notifier( |
110 | instance_watcher->m_work_queue, | |
111 | instance_watcher->m_ioctx, | |
112 | RBD_MIRROR_INSTANCE_PREFIX + instance_id)); | |
113 | } | |
114 | ||
115 | instance_watcher->m_notify_op_tracker.start_op(); | |
7c673cae FG |
116 | auto result = instance_watcher->m_notify_ops.insert( |
117 | std::make_pair(instance_id, this)).second; | |
11fdf7f2 | 118 | ceph_assert(result); |
7c673cae FG |
119 | } |
120 | ||
121 | void send() { | |
11fdf7f2 | 122 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; |
7c673cae | 123 | |
9f95a23c | 124 | ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); |
31f18b77 FG |
125 | |
126 | if (canceling) { | |
11fdf7f2 | 127 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
31f18b77 FG |
128 | << ": canceling" << dendl; |
129 | instance_watcher->m_work_queue->queue(this, -ECANCELED); | |
130 | return; | |
131 | } | |
132 | ||
133 | if (send_to_leader) { | |
134 | if (instance_watcher->m_leader_instance_id.empty()) { | |
11fdf7f2 | 135 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
31f18b77 FG |
136 | << ": suspending" << dendl; |
137 | instance_watcher->suspend_notify_request(this); | |
138 | return; | |
139 | } | |
140 | ||
141 | if (instance_watcher->m_leader_instance_id != instance_id) { | |
142 | auto count = instance_watcher->m_notify_ops.erase( | |
143 | std::make_pair(instance_id, this)); | |
11fdf7f2 | 144 | ceph_assert(count > 0); |
31f18b77 FG |
145 | |
146 | instance_id = instance_watcher->m_leader_instance_id; | |
147 | ||
148 | auto result = instance_watcher->m_notify_ops.insert( | |
149 | std::make_pair(instance_id, this)).second; | |
11fdf7f2 | 150 | ceph_assert(result); |
31f18b77 FG |
151 | |
152 | notifier.reset(new librbd::watcher::Notifier( | |
153 | instance_watcher->m_work_queue, | |
154 | instance_watcher->m_ioctx, | |
155 | RBD_MIRROR_INSTANCE_PREFIX + instance_id)); | |
156 | } | |
157 | } | |
158 | ||
11fdf7f2 | 159 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
d2e6a577 | 160 | << ": sending to " << instance_id << dendl; |
31f18b77 | 161 | notifier->notify(bl, &response, this); |
7c673cae FG |
162 | } |
163 | ||
164 | void cancel() { | |
11fdf7f2 | 165 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; |
7c673cae | 166 | |
9f95a23c | 167 | ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); |
31f18b77 FG |
168 | |
169 | canceling = true; | |
170 | instance_watcher->unsuspend_notify_request(this); | |
7c673cae FG |
171 | } |
172 | ||
173 | void finish(int r) override { | |
11fdf7f2 | 174 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r=" |
7c673cae FG |
175 | << r << dendl; |
176 | ||
177 | if (r == 0 || r == -ETIMEDOUT) { | |
178 | bool found = false; | |
179 | for (auto &it : response.acks) { | |
180 | auto &bl = it.second; | |
181 | if (it.second.length() == 0) { | |
11fdf7f2 TL |
182 | dout(5) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
183 | << ": no payload in ack, ignoring" << dendl; | |
7c673cae FG |
184 | continue; |
185 | } | |
186 | try { | |
11fdf7f2 | 187 | auto iter = bl.cbegin(); |
7c673cae | 188 | NotifyAckPayload ack; |
11fdf7f2 | 189 | decode(ack, iter); |
7c673cae FG |
190 | if (ack.instance_id != instance_watcher->get_instance_id()) { |
191 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
192 | << ": ack instance_id (" << ack.instance_id << ") " | |
193 | << "does not match, ignoring" << dendl; | |
194 | continue; | |
195 | } | |
196 | if (ack.request_id != request_id) { | |
197 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
198 | << ": ack request_id (" << ack.request_id << ") " | |
199 | << "does not match, ignoring" << dendl; | |
200 | continue; | |
201 | } | |
202 | r = ack.ret_val; | |
203 | found = true; | |
204 | break; | |
205 | } catch (const buffer::error &err) { | |
206 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
207 | << ": failed to decode ack: " << err.what() << dendl; | |
208 | continue; | |
209 | } | |
210 | } | |
211 | ||
212 | if (!found) { | |
213 | if (r == -ETIMEDOUT) { | |
31f18b77 FG |
214 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ |
215 | << ": resending after timeout" << dendl; | |
9f95a23c | 216 | std::lock_guard locker{instance_watcher->m_lock}; |
31f18b77 FG |
217 | send(); |
218 | return; | |
7c673cae FG |
219 | } else { |
220 | r = -EINVAL; | |
221 | } | |
31f18b77 FG |
222 | } else { |
223 | if (r == -ESTALE && send_to_leader) { | |
224 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
225 | << ": resending due to leader change" << dendl; | |
9f95a23c | 226 | std::lock_guard locker{instance_watcher->m_lock}; |
31f18b77 FG |
227 | send(); |
228 | return; | |
229 | } | |
7c673cae FG |
230 | } |
231 | } | |
232 | ||
7c673cae FG |
233 | on_finish->complete(r); |
234 | ||
31f18b77 | 235 | { |
9f95a23c | 236 | std::lock_guard locker{instance_watcher->m_lock}; |
31f18b77 | 237 | auto result = instance_watcher->m_notify_ops.erase( |
7c673cae | 238 | std::make_pair(instance_id, this)); |
11fdf7f2 | 239 | ceph_assert(result > 0); |
31f18b77 FG |
240 | instance_watcher->m_notify_op_tracker.finish_op(); |
241 | } | |
242 | ||
7c673cae FG |
243 | delete this; |
244 | } | |
245 | ||
246 | void complete(int r) override { | |
247 | finish(r); | |
248 | } | |
249 | }; | |
250 | ||
31f18b77 FG |
251 | template <typename I> |
252 | struct InstanceWatcher<I>::C_SyncRequest : public Context { | |
253 | InstanceWatcher<I> *instance_watcher; | |
254 | std::string sync_id; | |
255 | Context *on_start; | |
256 | Context *on_complete = nullptr; | |
257 | C_NotifyInstanceRequest *req = nullptr; | |
258 | ||
259 | C_SyncRequest(InstanceWatcher<I> *instance_watcher, | |
260 | const std::string &sync_id, Context *on_start) | |
261 | : instance_watcher(instance_watcher), sync_id(sync_id), | |
262 | on_start(on_start) { | |
11fdf7f2 | 263 | dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id=" |
31f18b77 FG |
264 | << sync_id << dendl; |
265 | } | |
266 | ||
267 | void finish(int r) override { | |
11fdf7f2 | 268 | dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": r=" |
31f18b77 FG |
269 | << r << dendl; |
270 | ||
271 | if (on_start != nullptr) { | |
272 | instance_watcher->handle_notify_sync_request(this, r); | |
273 | } else { | |
274 | instance_watcher->handle_notify_sync_complete(this, r); | |
275 | delete this; | |
276 | } | |
277 | } | |
278 | ||
279 | // called twice | |
280 | void complete(int r) override { | |
281 | finish(r); | |
282 | } | |
283 | }; | |
284 | ||
7c673cae FG |
285 | #undef dout_prefix |
286 | #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ | |
287 | << this << " " << __func__ << ": " | |
288 | template <typename I> | |
289 | void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx, | |
290 | std::vector<std::string> *instance_ids, | |
291 | Context *on_finish) { | |
292 | librados::ObjectReadOperation op; | |
293 | librbd::cls_client::mirror_instances_list_start(&op); | |
294 | C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish); | |
295 | librados::AioCompletion *aio_comp = create_rados_callback(ctx); | |
296 | ||
297 | int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl); | |
11fdf7f2 | 298 | ceph_assert(r == 0); |
7c673cae FG |
299 | aio_comp->release(); |
300 | } | |
301 | ||
302 | template <typename I> | |
303 | void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx, | |
304 | ContextWQ *work_queue, | |
305 | const std::string &instance_id, | |
306 | Context *on_finish) { | |
307 | auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id, | |
308 | on_finish); | |
309 | req->send(); | |
310 | } | |
311 | ||
312 | template <typename I> | |
313 | InstanceWatcher<I> *InstanceWatcher<I>::create( | |
314 | librados::IoCtx &io_ctx, ContextWQ *work_queue, | |
9f95a23c TL |
315 | InstanceReplayer<I> *instance_replayer, |
316 | Throttler<I> *image_sync_throttler) { | |
7c673cae | 317 | return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer, |
9f95a23c | 318 | image_sync_throttler, |
7c673cae FG |
319 | stringify(io_ctx.get_instance_id())); |
320 | } | |
321 | ||
322 | template <typename I> | |
323 | InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx, | |
324 | ContextWQ *work_queue, | |
325 | InstanceReplayer<I> *instance_replayer, | |
9f95a23c | 326 | Throttler<I> *image_sync_throttler, |
7c673cae FG |
327 | const std::string &instance_id) |
328 | : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id), | |
9f95a23c TL |
329 | m_instance_replayer(instance_replayer), |
330 | m_image_sync_throttler(image_sync_throttler), m_instance_id(instance_id), | |
331 | m_lock(ceph::make_mutex( | |
332 | unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this))), | |
7c673cae FG |
333 | m_instance_lock(librbd::ManagedLock<I>::create( |
334 | m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true, | |
11fdf7f2 | 335 | m_cct->_conf.get_val<uint64_t>("rbd_blacklist_expire_seconds"))) { |
7c673cae FG |
336 | } |
337 | ||
338 | template <typename I> | |
339 | InstanceWatcher<I>::~InstanceWatcher() { | |
11fdf7f2 TL |
340 | ceph_assert(m_requests.empty()); |
341 | ceph_assert(m_notify_ops.empty()); | |
342 | ceph_assert(m_notify_op_tracker.empty()); | |
343 | ceph_assert(m_suspended_ops.empty()); | |
344 | ceph_assert(m_inflight_sync_reqs.empty()); | |
7c673cae FG |
345 | m_instance_lock->destroy(); |
346 | } | |
347 | ||
348 | template <typename I> | |
349 | int InstanceWatcher<I>::init() { | |
350 | C_SaferCond init_ctx; | |
351 | init(&init_ctx); | |
352 | return init_ctx.wait(); | |
353 | } | |
354 | ||
355 | template <typename I> | |
356 | void InstanceWatcher<I>::init(Context *on_finish) { | |
11fdf7f2 | 357 | dout(10) << "instance_id=" << m_instance_id << dendl; |
7c673cae | 358 | |
9f95a23c | 359 | std::lock_guard locker{m_lock}; |
7c673cae | 360 | |
11fdf7f2 | 361 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
362 | m_on_finish = on_finish; |
363 | m_ret_val = 0; | |
364 | ||
365 | register_instance(); | |
366 | } | |
367 | ||
368 | template <typename I> | |
369 | void InstanceWatcher<I>::shut_down() { | |
370 | C_SaferCond shut_down_ctx; | |
371 | shut_down(&shut_down_ctx); | |
372 | int r = shut_down_ctx.wait(); | |
11fdf7f2 | 373 | ceph_assert(r == 0); |
7c673cae FG |
374 | } |
375 | ||
376 | template <typename I> | |
377 | void InstanceWatcher<I>::shut_down(Context *on_finish) { | |
11fdf7f2 | 378 | dout(10) << dendl; |
7c673cae | 379 | |
9f95a23c | 380 | std::lock_guard locker{m_lock}; |
7c673cae | 381 | |
11fdf7f2 | 382 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
383 | m_on_finish = on_finish; |
384 | m_ret_val = 0; | |
385 | ||
386 | release_lock(); | |
387 | } | |
388 | ||
389 | template <typename I> | |
390 | void InstanceWatcher<I>::remove(Context *on_finish) { | |
11fdf7f2 | 391 | dout(10) << dendl; |
7c673cae | 392 | |
9f95a23c | 393 | std::lock_guard locker{m_lock}; |
7c673cae | 394 | |
11fdf7f2 | 395 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
396 | m_on_finish = on_finish; |
397 | m_ret_val = 0; | |
7c673cae FG |
398 | |
399 | get_instance_locker(); | |
400 | } | |
401 | ||
402 | template <typename I> | |
403 | void InstanceWatcher<I>::notify_image_acquire( | |
404 | const std::string &instance_id, const std::string &global_image_id, | |
d2e6a577 | 405 | Context *on_notify_ack) { |
11fdf7f2 | 406 | dout(10) << "instance_id=" << instance_id << ", global_image_id=" |
7c673cae FG |
407 | << global_image_id << dendl; |
408 | ||
9f95a23c | 409 | std::lock_guard locker{m_lock}; |
7c673cae | 410 | |
11fdf7f2 | 411 | ceph_assert(m_on_finish == nullptr); |
7c673cae | 412 | |
11fdf7f2 TL |
413 | uint64_t request_id = ++m_request_seq; |
414 | bufferlist bl; | |
415 | encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}}, bl); | |
416 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
417 | std::move(bl), on_notify_ack); | |
418 | req->send(); | |
7c673cae FG |
419 | } |
420 | ||
421 | template <typename I> | |
422 | void InstanceWatcher<I>::notify_image_release( | |
d2e6a577 FG |
423 | const std::string &instance_id, const std::string &global_image_id, |
424 | Context *on_notify_ack) { | |
11fdf7f2 | 425 | dout(10) << "instance_id=" << instance_id << ", global_image_id=" |
7c673cae FG |
426 | << global_image_id << dendl; |
427 | ||
9f95a23c | 428 | std::lock_guard locker{m_lock}; |
7c673cae | 429 | |
11fdf7f2 | 430 | ceph_assert(m_on_finish == nullptr); |
7c673cae | 431 | |
11fdf7f2 TL |
432 | uint64_t request_id = ++m_request_seq; |
433 | bufferlist bl; | |
434 | encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}}, bl); | |
435 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
436 | std::move(bl), on_notify_ack); | |
437 | req->send(); | |
d2e6a577 FG |
438 | } |
439 | ||
440 | template <typename I> | |
441 | void InstanceWatcher<I>::notify_peer_image_removed( | |
442 | const std::string &instance_id, const std::string &global_image_id, | |
443 | const std::string &peer_mirror_uuid, Context *on_notify_ack) { | |
11fdf7f2 | 444 | dout(10) << "instance_id=" << instance_id << ", " |
d2e6a577 FG |
445 | << "global_image_id=" << global_image_id << ", " |
446 | << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; | |
447 | ||
9f95a23c | 448 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 449 | ceph_assert(m_on_finish == nullptr); |
d2e6a577 | 450 | |
11fdf7f2 TL |
451 | uint64_t request_id = ++m_request_seq; |
452 | bufferlist bl; | |
453 | encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id, | |
454 | peer_mirror_uuid}}, bl); | |
455 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
456 | std::move(bl), on_notify_ack); | |
457 | req->send(); | |
7c673cae FG |
458 | } |
459 | ||
31f18b77 FG |
460 | template <typename I> |
461 | void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id, | |
462 | Context *on_sync_start) { | |
11fdf7f2 | 463 | dout(10) << "sync_id=" << sync_id << dendl; |
31f18b77 | 464 | |
9f95a23c | 465 | std::lock_guard locker{m_lock}; |
31f18b77 | 466 | |
11fdf7f2 | 467 | ceph_assert(m_inflight_sync_reqs.count(sync_id) == 0); |
31f18b77 FG |
468 | |
469 | uint64_t request_id = ++m_request_seq; | |
470 | ||
471 | bufferlist bl; | |
11fdf7f2 | 472 | encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl); |
31f18b77 FG |
473 | |
474 | auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start); | |
475 | sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id, | |
476 | std::move(bl), sync_ctx); | |
477 | ||
478 | m_inflight_sync_reqs[sync_id] = sync_ctx; | |
479 | sync_ctx->req->send(); | |
480 | } | |
481 | ||
482 | template <typename I> | |
483 | bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) { | |
11fdf7f2 | 484 | dout(10) << "sync_id=" << sync_id << dendl; |
31f18b77 | 485 | |
9f95a23c | 486 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
487 | |
488 | auto it = m_inflight_sync_reqs.find(sync_id); | |
489 | if (it == m_inflight_sync_reqs.end()) { | |
490 | return false; | |
491 | } | |
492 | ||
493 | auto sync_ctx = it->second; | |
494 | ||
495 | if (sync_ctx->on_start == nullptr) { | |
496 | return false; | |
497 | } | |
498 | ||
11fdf7f2 | 499 | ceph_assert(sync_ctx->req != nullptr); |
31f18b77 FG |
500 | sync_ctx->req->cancel(); |
501 | return true; | |
502 | } | |
503 | ||
504 | template <typename I> | |
505 | void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id, | |
506 | const std::string &sync_id) { | |
11fdf7f2 | 507 | dout(10) << "sync_id=" << sync_id << dendl; |
31f18b77 | 508 | |
9f95a23c | 509 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
510 | |
511 | uint64_t request_id = ++m_request_seq; | |
512 | ||
513 | bufferlist bl; | |
11fdf7f2 | 514 | encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl); |
31f18b77 | 515 | |
9f95a23c | 516 | auto ctx = new LambdaContext( |
31f18b77 | 517 | [this, sync_id] (int r) { |
11fdf7f2 | 518 | dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl; |
9f95a23c TL |
519 | std::lock_guard locker{m_lock}; |
520 | if (r != -ESTALE && is_leader()) { | |
521 | m_image_sync_throttler->finish_op(m_ioctx.get_namespace(), sync_id); | |
31f18b77 FG |
522 | } |
523 | }); | |
524 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
525 | std::move(bl), ctx); | |
526 | req->send(); | |
527 | } | |
528 | ||
529 | template <typename I> | |
530 | void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) { | |
9f95a23c | 531 | std::lock_guard locker{m_lock}; |
28e407b8 AA |
532 | notify_sync_complete(m_lock, sync_id); |
533 | } | |
534 | ||
535 | template <typename I> | |
9f95a23c | 536 | void InstanceWatcher<I>::notify_sync_complete(const ceph::mutex&, |
28e407b8 AA |
537 | const std::string &sync_id) { |
538 | dout(10) << "sync_id=" << sync_id << dendl; | |
9f95a23c | 539 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
540 | |
541 | auto it = m_inflight_sync_reqs.find(sync_id); | |
11fdf7f2 | 542 | ceph_assert(it != m_inflight_sync_reqs.end()); |
31f18b77 FG |
543 | |
544 | auto sync_ctx = it->second; | |
11fdf7f2 | 545 | ceph_assert(sync_ctx->req == nullptr); |
31f18b77 FG |
546 | |
547 | m_inflight_sync_reqs.erase(it); | |
548 | m_work_queue->queue(sync_ctx, 0); | |
549 | } | |
550 | ||
551 | template <typename I> | |
552 | void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx, | |
553 | int r) { | |
11fdf7f2 | 554 | dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; |
31f18b77 FG |
555 | |
556 | Context *on_start = nullptr; | |
557 | { | |
9f95a23c | 558 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
559 | ceph_assert(sync_ctx->req != nullptr); |
560 | ceph_assert(sync_ctx->on_start != nullptr); | |
31f18b77 FG |
561 | |
562 | if (sync_ctx->req->canceling) { | |
563 | r = -ECANCELED; | |
564 | } | |
565 | ||
566 | std::swap(sync_ctx->on_start, on_start); | |
567 | sync_ctx->req = nullptr; | |
28e407b8 AA |
568 | |
569 | if (r == -ECANCELED) { | |
570 | notify_sync_complete(m_lock, sync_ctx->sync_id); | |
571 | } | |
31f18b77 FG |
572 | } |
573 | ||
574 | on_start->complete(r == -ECANCELED ? r : 0); | |
31f18b77 FG |
575 | } |
576 | ||
577 | template <typename I> | |
578 | void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx, | |
579 | int r) { | |
11fdf7f2 | 580 | dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; |
31f18b77 FG |
581 | |
582 | if (sync_ctx->on_complete != nullptr) { | |
583 | sync_ctx->on_complete->complete(r); | |
584 | } | |
585 | } | |
586 | ||
31f18b77 FG |
587 | template <typename I> |
588 | void InstanceWatcher<I>::handle_acquire_leader() { | |
11fdf7f2 | 589 | dout(10) << dendl; |
31f18b77 | 590 | |
9f95a23c | 591 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
592 | |
593 | m_leader_instance_id = m_instance_id; | |
594 | unsuspend_notify_requests(); | |
595 | } | |
596 | ||
597 | template <typename I> | |
598 | void InstanceWatcher<I>::handle_release_leader() { | |
11fdf7f2 | 599 | dout(10) << dendl; |
31f18b77 | 600 | |
9f95a23c | 601 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
602 | |
603 | m_leader_instance_id.clear(); | |
604 | ||
9f95a23c | 605 | m_image_sync_throttler->drain(m_ioctx.get_namespace(), -ESTALE); |
31f18b77 FG |
606 | } |
607 | ||
608 | template <typename I> | |
609 | void InstanceWatcher<I>::handle_update_leader( | |
610 | const std::string &leader_instance_id) { | |
11fdf7f2 | 611 | dout(10) << "leader_instance_id=" << leader_instance_id << dendl; |
31f18b77 | 612 | |
9f95a23c | 613 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
614 | |
615 | m_leader_instance_id = leader_instance_id; | |
616 | ||
617 | if (!m_leader_instance_id.empty()) { | |
618 | unsuspend_notify_requests(); | |
619 | } | |
620 | } | |
621 | ||
7c673cae FG |
622 | template <typename I> |
623 | void InstanceWatcher<I>::cancel_notify_requests( | |
624 | const std::string &instance_id) { | |
11fdf7f2 | 625 | dout(10) << "instance_id=" << instance_id << dendl; |
7c673cae | 626 | |
9f95a23c | 627 | std::lock_guard locker{m_lock}; |
7c673cae FG |
628 | |
629 | for (auto op : m_notify_ops) { | |
31f18b77 | 630 | if (op.first == instance_id && !op.second->send_to_leader) { |
7c673cae FG |
631 | op.second->cancel(); |
632 | } | |
633 | } | |
634 | } | |
635 | ||
7c673cae FG |
636 | template <typename I> |
637 | void InstanceWatcher<I>::register_instance() { | |
9f95a23c | 638 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 639 | |
11fdf7f2 | 640 | dout(10) << dendl; |
7c673cae FG |
641 | |
642 | librados::ObjectWriteOperation op; | |
643 | librbd::cls_client::mirror_instances_add(&op, m_instance_id); | |
644 | librados::AioCompletion *aio_comp = create_rados_callback< | |
645 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this); | |
646 | ||
647 | int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); | |
11fdf7f2 | 648 | ceph_assert(r == 0); |
7c673cae FG |
649 | aio_comp->release(); |
650 | } | |
651 | ||
652 | template <typename I> | |
653 | void InstanceWatcher<I>::handle_register_instance(int r) { | |
11fdf7f2 | 654 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
655 | |
656 | Context *on_finish = nullptr; | |
657 | { | |
9f95a23c | 658 | std::lock_guard locker{m_lock}; |
7c673cae FG |
659 | |
660 | if (r == 0) { | |
661 | create_instance_object(); | |
662 | return; | |
663 | } | |
664 | ||
665 | derr << "error registering instance: " << cpp_strerror(r) << dendl; | |
666 | ||
667 | std::swap(on_finish, m_on_finish); | |
668 | } | |
669 | on_finish->complete(r); | |
670 | } | |
671 | ||
672 | ||
673 | template <typename I> | |
674 | void InstanceWatcher<I>::create_instance_object() { | |
11fdf7f2 | 675 | dout(10) << dendl; |
7c673cae | 676 | |
9f95a23c | 677 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
678 | |
679 | librados::ObjectWriteOperation op; | |
680 | op.create(true); | |
681 | ||
682 | librados::AioCompletion *aio_comp = create_rados_callback< | |
683 | InstanceWatcher<I>, | |
684 | &InstanceWatcher<I>::handle_create_instance_object>(this); | |
685 | int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); | |
11fdf7f2 | 686 | ceph_assert(r == 0); |
7c673cae FG |
687 | aio_comp->release(); |
688 | } | |
689 | ||
690 | template <typename I> | |
691 | void InstanceWatcher<I>::handle_create_instance_object(int r) { | |
11fdf7f2 | 692 | dout(10) << "r=" << r << dendl; |
7c673cae | 693 | |
9f95a23c | 694 | std::lock_guard locker{m_lock}; |
7c673cae FG |
695 | |
696 | if (r < 0) { | |
697 | derr << "error creating " << m_oid << " object: " << cpp_strerror(r) | |
698 | << dendl; | |
699 | ||
700 | m_ret_val = r; | |
701 | unregister_instance(); | |
702 | return; | |
703 | } | |
704 | ||
705 | register_watch(); | |
706 | } | |
707 | ||
708 | template <typename I> | |
709 | void InstanceWatcher<I>::register_watch() { | |
11fdf7f2 | 710 | dout(10) << dendl; |
7c673cae | 711 | |
9f95a23c | 712 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
713 | |
714 | Context *ctx = create_async_context_callback( | |
715 | m_work_queue, create_context_callback< | |
716 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this)); | |
717 | ||
718 | librbd::Watcher::register_watch(ctx); | |
719 | } | |
720 | ||
721 | template <typename I> | |
722 | void InstanceWatcher<I>::handle_register_watch(int r) { | |
11fdf7f2 | 723 | dout(10) << "r=" << r << dendl; |
7c673cae | 724 | |
9f95a23c | 725 | std::lock_guard locker{m_lock}; |
7c673cae FG |
726 | |
727 | if (r < 0) { | |
728 | derr << "error registering instance watcher for " << m_oid << " object: " | |
729 | << cpp_strerror(r) << dendl; | |
730 | ||
731 | m_ret_val = r; | |
732 | remove_instance_object(); | |
733 | return; | |
734 | } | |
735 | ||
736 | acquire_lock(); | |
737 | } | |
738 | ||
739 | template <typename I> | |
740 | void InstanceWatcher<I>::acquire_lock() { | |
11fdf7f2 | 741 | dout(10) << dendl; |
7c673cae | 742 | |
9f95a23c | 743 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
744 | |
745 | Context *ctx = create_async_context_callback( | |
746 | m_work_queue, create_context_callback< | |
747 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this)); | |
748 | ||
749 | m_instance_lock->acquire_lock(ctx); | |
750 | } | |
751 | ||
752 | template <typename I> | |
753 | void InstanceWatcher<I>::handle_acquire_lock(int r) { | |
11fdf7f2 | 754 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
755 | |
756 | Context *on_finish = nullptr; | |
757 | { | |
9f95a23c | 758 | std::lock_guard locker{m_lock}; |
7c673cae FG |
759 | |
760 | if (r < 0) { | |
761 | ||
762 | derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl; | |
763 | ||
764 | m_ret_val = r; | |
765 | unregister_watch(); | |
766 | return; | |
767 | } | |
768 | ||
769 | std::swap(on_finish, m_on_finish); | |
770 | } | |
771 | ||
772 | on_finish->complete(r); | |
773 | } | |
774 | ||
775 | template <typename I> | |
776 | void InstanceWatcher<I>::release_lock() { | |
11fdf7f2 | 777 | dout(10) << dendl; |
7c673cae | 778 | |
9f95a23c | 779 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
780 | |
781 | Context *ctx = create_async_context_callback( | |
782 | m_work_queue, create_context_callback< | |
783 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this)); | |
784 | ||
785 | m_instance_lock->shut_down(ctx); | |
786 | } | |
787 | ||
788 | template <typename I> | |
789 | void InstanceWatcher<I>::handle_release_lock(int r) { | |
11fdf7f2 | 790 | dout(10) << "r=" << r << dendl; |
7c673cae | 791 | |
9f95a23c | 792 | std::lock_guard locker{m_lock}; |
7c673cae FG |
793 | |
794 | if (r < 0) { | |
795 | derr << "error releasing instance lock: " << cpp_strerror(r) << dendl; | |
796 | } | |
797 | ||
798 | unregister_watch(); | |
799 | } | |
800 | ||
801 | template <typename I> | |
802 | void InstanceWatcher<I>::unregister_watch() { | |
11fdf7f2 | 803 | dout(10) << dendl; |
7c673cae | 804 | |
9f95a23c | 805 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
806 | |
807 | Context *ctx = create_async_context_callback( | |
808 | m_work_queue, create_context_callback< | |
809 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this)); | |
810 | ||
811 | librbd::Watcher::unregister_watch(ctx); | |
812 | } | |
813 | ||
814 | template <typename I> | |
815 | void InstanceWatcher<I>::handle_unregister_watch(int r) { | |
11fdf7f2 | 816 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
817 | |
818 | if (r < 0) { | |
819 | derr << "error unregistering instance watcher for " << m_oid << " object: " | |
820 | << cpp_strerror(r) << dendl; | |
821 | } | |
822 | ||
9f95a23c | 823 | std::lock_guard locker{m_lock}; |
7c673cae FG |
824 | remove_instance_object(); |
825 | } | |
826 | ||
827 | template <typename I> | |
828 | void InstanceWatcher<I>::remove_instance_object() { | |
9f95a23c | 829 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 830 | |
11fdf7f2 | 831 | dout(10) << dendl; |
7c673cae FG |
832 | |
833 | librados::ObjectWriteOperation op; | |
834 | op.remove(); | |
835 | ||
836 | librados::AioCompletion *aio_comp = create_rados_callback< | |
837 | InstanceWatcher<I>, | |
838 | &InstanceWatcher<I>::handle_remove_instance_object>(this); | |
839 | int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); | |
11fdf7f2 | 840 | ceph_assert(r == 0); |
7c673cae FG |
841 | aio_comp->release(); |
842 | } | |
843 | ||
844 | template <typename I> | |
845 | void InstanceWatcher<I>::handle_remove_instance_object(int r) { | |
11fdf7f2 | 846 | dout(10) << "r=" << r << dendl; |
7c673cae | 847 | |
11fdf7f2 | 848 | if (r == -ENOENT) { |
7c673cae FG |
849 | r = 0; |
850 | } | |
851 | ||
852 | if (r < 0) { | |
853 | derr << "error removing " << m_oid << " object: " << cpp_strerror(r) | |
854 | << dendl; | |
855 | } | |
856 | ||
9f95a23c | 857 | std::lock_guard locker{m_lock}; |
7c673cae FG |
858 | unregister_instance(); |
859 | } | |
860 | ||
861 | template <typename I> | |
862 | void InstanceWatcher<I>::unregister_instance() { | |
11fdf7f2 | 863 | dout(10) << dendl; |
7c673cae | 864 | |
9f95a23c | 865 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
866 | |
867 | librados::ObjectWriteOperation op; | |
868 | librbd::cls_client::mirror_instances_remove(&op, m_instance_id); | |
869 | librados::AioCompletion *aio_comp = create_rados_callback< | |
870 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this); | |
871 | ||
872 | int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); | |
11fdf7f2 | 873 | ceph_assert(r == 0); |
7c673cae FG |
874 | aio_comp->release(); |
875 | } | |
876 | ||
877 | template <typename I> | |
878 | void InstanceWatcher<I>::handle_unregister_instance(int r) { | |
11fdf7f2 | 879 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
880 | |
881 | if (r < 0) { | |
882 | derr << "error unregistering instance: " << cpp_strerror(r) << dendl; | |
883 | } | |
884 | ||
9f95a23c | 885 | std::lock_guard locker{m_lock}; |
7c673cae FG |
886 | wait_for_notify_ops(); |
887 | } | |
888 | ||
889 | template <typename I> | |
890 | void InstanceWatcher<I>::wait_for_notify_ops() { | |
11fdf7f2 | 891 | dout(10) << dendl; |
7c673cae | 892 | |
9f95a23c | 893 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
894 | |
895 | for (auto op : m_notify_ops) { | |
896 | op.second->cancel(); | |
897 | } | |
898 | ||
899 | Context *ctx = create_async_context_callback( | |
900 | m_work_queue, create_context_callback< | |
901 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this)); | |
902 | ||
903 | m_notify_op_tracker.wait_for_ops(ctx); | |
904 | } | |
905 | ||
906 | template <typename I> | |
907 | void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) { | |
11fdf7f2 | 908 | dout(10) << "r=" << r << dendl; |
7c673cae | 909 | |
11fdf7f2 | 910 | ceph_assert(r == 0); |
7c673cae FG |
911 | |
912 | Context *on_finish = nullptr; | |
913 | { | |
9f95a23c | 914 | std::lock_guard locker{m_lock}; |
7c673cae | 915 | |
11fdf7f2 | 916 | ceph_assert(m_notify_ops.empty()); |
7c673cae FG |
917 | |
918 | std::swap(on_finish, m_on_finish); | |
919 | r = m_ret_val; | |
7c673cae FG |
920 | } |
921 | on_finish->complete(r); | |
922 | } | |
923 | ||
924 | template <typename I> | |
925 | void InstanceWatcher<I>::get_instance_locker() { | |
11fdf7f2 | 926 | dout(10) << dendl; |
7c673cae | 927 | |
9f95a23c | 928 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
929 | |
930 | Context *ctx = create_async_context_callback( | |
931 | m_work_queue, create_context_callback< | |
932 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this)); | |
933 | ||
934 | m_instance_lock->get_locker(&m_instance_locker, ctx); | |
935 | } | |
936 | ||
937 | template <typename I> | |
938 | void InstanceWatcher<I>::handle_get_instance_locker(int r) { | |
11fdf7f2 | 939 | dout(10) << "r=" << r << dendl; |
7c673cae | 940 | |
9f95a23c | 941 | std::lock_guard locker{m_lock}; |
7c673cae FG |
942 | |
943 | if (r < 0) { | |
944 | if (r != -ENOENT) { | |
945 | derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl; | |
946 | } | |
947 | remove_instance_object(); | |
948 | return; | |
949 | } | |
950 | ||
951 | break_instance_lock(); | |
952 | } | |
953 | ||
954 | template <typename I> | |
955 | void InstanceWatcher<I>::break_instance_lock() { | |
11fdf7f2 | 956 | dout(10) << dendl; |
7c673cae | 957 | |
9f95a23c | 958 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
959 | |
960 | Context *ctx = create_async_context_callback( | |
961 | m_work_queue, create_context_callback< | |
962 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this)); | |
963 | ||
964 | m_instance_lock->break_lock(m_instance_locker, true, ctx); | |
965 | } | |
966 | ||
967 | template <typename I> | |
968 | void InstanceWatcher<I>::handle_break_instance_lock(int r) { | |
11fdf7f2 | 969 | dout(10) << "r=" << r << dendl; |
7c673cae | 970 | |
9f95a23c | 971 | std::lock_guard locker{m_lock}; |
7c673cae FG |
972 | |
973 | if (r < 0) { | |
974 | if (r != -ENOENT) { | |
975 | derr << "error breaking instance lock: " << cpp_strerror(r) << dendl; | |
976 | } | |
977 | remove_instance_object(); | |
978 | return; | |
979 | } | |
980 | ||
981 | remove_instance_object(); | |
982 | } | |
983 | ||
31f18b77 FG |
984 | template <typename I> |
985 | void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) { | |
11fdf7f2 | 986 | dout(10) << req << dendl; |
31f18b77 | 987 | |
9f95a23c | 988 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
989 | |
990 | auto result = m_suspended_ops.insert(req).second; | |
11fdf7f2 | 991 | ceph_assert(result); |
31f18b77 FG |
992 | } |
993 | ||
994 | template <typename I> | |
995 | bool InstanceWatcher<I>::unsuspend_notify_request( | |
996 | C_NotifyInstanceRequest *req) { | |
11fdf7f2 | 997 | dout(10) << req << dendl; |
31f18b77 | 998 | |
9f95a23c | 999 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
1000 | |
1001 | auto result = m_suspended_ops.erase(req); | |
1002 | if (result == 0) { | |
1003 | return false; | |
1004 | } | |
1005 | ||
1006 | req->send(); | |
1007 | return true; | |
1008 | } | |
1009 | ||
1010 | template <typename I> | |
1011 | void InstanceWatcher<I>::unsuspend_notify_requests() { | |
11fdf7f2 | 1012 | dout(10) << dendl; |
31f18b77 | 1013 | |
9f95a23c | 1014 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
1015 | |
1016 | std::set<C_NotifyInstanceRequest *> suspended_ops; | |
1017 | std::swap(m_suspended_ops, suspended_ops); | |
1018 | ||
1019 | for (auto op : suspended_ops) { | |
1020 | op->send(); | |
1021 | } | |
1022 | } | |
1023 | ||
7c673cae FG |
1024 | template <typename I> |
1025 | Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id, | |
1026 | uint64_t request_id, | |
1027 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1028 | dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id |
7c673cae FG |
1029 | << dendl; |
1030 | ||
9f95a23c | 1031 | std::lock_guard locker{m_lock}; |
7c673cae FG |
1032 | |
1033 | Context *ctx = nullptr; | |
1034 | Request request(instance_id, request_id); | |
1035 | auto it = m_requests.find(request); | |
1036 | ||
1037 | if (it != m_requests.end()) { | |
11fdf7f2 | 1038 | dout(10) << "duplicate for in-progress request" << dendl; |
7c673cae FG |
1039 | delete it->on_notify_ack; |
1040 | m_requests.erase(it); | |
1041 | } else { | |
31f18b77 | 1042 | ctx = create_async_context_callback( |
9f95a23c | 1043 | m_work_queue, new LambdaContext( |
31f18b77 FG |
1044 | [this, instance_id, request_id] (int r) { |
1045 | complete_request(instance_id, request_id, r); | |
1046 | })); | |
7c673cae FG |
1047 | } |
1048 | ||
1049 | request.on_notify_ack = on_notify_ack; | |
1050 | m_requests.insert(request); | |
1051 | return ctx; | |
1052 | } | |
1053 | ||
31f18b77 FG |
1054 | template <typename I> |
1055 | void InstanceWatcher<I>::complete_request(const std::string &instance_id, | |
1056 | uint64_t request_id, int r) { | |
11fdf7f2 | 1057 | dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id |
31f18b77 FG |
1058 | << dendl; |
1059 | ||
1060 | C_NotifyAck *on_notify_ack; | |
1061 | { | |
9f95a23c | 1062 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
1063 | Request request(instance_id, request_id); |
1064 | auto it = m_requests.find(request); | |
11fdf7f2 | 1065 | ceph_assert(it != m_requests.end()); |
31f18b77 FG |
1066 | on_notify_ack = it->on_notify_ack; |
1067 | m_requests.erase(it); | |
1068 | } | |
1069 | ||
11fdf7f2 | 1070 | encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out); |
31f18b77 FG |
1071 | on_notify_ack->complete(0); |
1072 | } | |
1073 | ||
7c673cae FG |
1074 | template <typename I> |
1075 | void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle, | |
1076 | uint64_t notifier_id, bufferlist &bl) { | |
11fdf7f2 | 1077 | dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", " |
7c673cae FG |
1078 | << "notifier_id=" << notifier_id << dendl; |
1079 | ||
1080 | auto ctx = new C_NotifyAck(this, notify_id, handle); | |
1081 | ||
1082 | NotifyMessage notify_message; | |
1083 | try { | |
11fdf7f2 TL |
1084 | auto iter = bl.cbegin(); |
1085 | decode(notify_message, iter); | |
7c673cae FG |
1086 | } catch (const buffer::error &err) { |
1087 | derr << "error decoding image notification: " << err.what() << dendl; | |
1088 | ctx->complete(0); | |
1089 | return; | |
1090 | } | |
1091 | ||
1092 | apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx), | |
1093 | notify_message.payload); | |
1094 | } | |
1095 | ||
1096 | template <typename I> | |
1097 | void InstanceWatcher<I>::handle_image_acquire( | |
d2e6a577 | 1098 | const std::string &global_image_id, Context *on_finish) { |
11fdf7f2 | 1099 | dout(10) << "global_image_id=" << global_image_id << dendl; |
7c673cae | 1100 | |
9f95a23c | 1101 | auto ctx = new LambdaContext( |
d2e6a577 FG |
1102 | [this, global_image_id, on_finish] (int r) { |
1103 | m_instance_replayer->acquire_image(this, global_image_id, on_finish); | |
31f18b77 FG |
1104 | m_notify_op_tracker.finish_op(); |
1105 | }); | |
1106 | ||
1107 | m_notify_op_tracker.start_op(); | |
1108 | m_work_queue->queue(ctx, 0); | |
7c673cae FG |
1109 | } |
1110 | ||
1111 | template <typename I> | |
1112 | void InstanceWatcher<I>::handle_image_release( | |
d2e6a577 | 1113 | const std::string &global_image_id, Context *on_finish) { |
11fdf7f2 | 1114 | dout(10) << "global_image_id=" << global_image_id << dendl; |
7c673cae | 1115 | |
9f95a23c | 1116 | auto ctx = new LambdaContext( |
d2e6a577 FG |
1117 | [this, global_image_id, on_finish] (int r) { |
1118 | m_instance_replayer->release_image(global_image_id, on_finish); | |
1119 | m_notify_op_tracker.finish_op(); | |
1120 | }); | |
1121 | ||
1122 | m_notify_op_tracker.start_op(); | |
1123 | m_work_queue->queue(ctx, 0); | |
1124 | } | |
1125 | ||
1126 | template <typename I> | |
1127 | void InstanceWatcher<I>::handle_peer_image_removed( | |
1128 | const std::string &global_image_id, const std::string &peer_mirror_uuid, | |
1129 | Context *on_finish) { | |
11fdf7f2 | 1130 | dout(10) << "global_image_id=" << global_image_id << ", " |
d2e6a577 FG |
1131 | << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; |
1132 | ||
9f95a23c | 1133 | auto ctx = new LambdaContext( |
d2e6a577 FG |
1134 | [this, peer_mirror_uuid, global_image_id, on_finish] (int r) { |
1135 | m_instance_replayer->remove_peer_image(global_image_id, | |
1136 | peer_mirror_uuid, on_finish); | |
31f18b77 FG |
1137 | m_notify_op_tracker.finish_op(); |
1138 | }); | |
1139 | ||
1140 | m_notify_op_tracker.start_op(); | |
1141 | m_work_queue->queue(ctx, 0); | |
1142 | } | |
1143 | ||
1144 | template <typename I> | |
1145 | void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id, | |
1146 | const std::string &sync_id, | |
1147 | Context *on_finish) { | |
11fdf7f2 | 1148 | dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; |
31f18b77 | 1149 | |
9f95a23c | 1150 | std::lock_guard locker{m_lock}; |
31f18b77 | 1151 | |
9f95a23c | 1152 | if (!is_leader()) { |
11fdf7f2 | 1153 | dout(10) << "sync request for non-leader" << dendl; |
31f18b77 FG |
1154 | m_work_queue->queue(on_finish, -ESTALE); |
1155 | return; | |
1156 | } | |
1157 | ||
1158 | Context *on_start = create_async_context_callback( | |
9f95a23c | 1159 | m_work_queue, new LambdaContext( |
31f18b77 | 1160 | [this, instance_id, sync_id, on_finish] (int r) { |
11fdf7f2 | 1161 | dout(10) << "handle_sync_request: finish: instance_id=" << instance_id |
31f18b77 FG |
1162 | << ", sync_id=" << sync_id << ", r=" << r << dendl; |
1163 | if (r == 0) { | |
1164 | notify_sync_start(instance_id, sync_id); | |
1165 | } | |
494da23a TL |
1166 | if (r == -ENOENT) { |
1167 | r = 0; | |
1168 | } | |
31f18b77 FG |
1169 | on_finish->complete(r); |
1170 | })); | |
9f95a23c | 1171 | m_image_sync_throttler->start_op(m_ioctx.get_namespace(), sync_id, on_start); |
31f18b77 FG |
1172 | } |
1173 | ||
1174 | template <typename I> | |
1175 | void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id, | |
1176 | const std::string &sync_id, | |
1177 | Context *on_finish) { | |
11fdf7f2 | 1178 | dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; |
31f18b77 | 1179 | |
9f95a23c | 1180 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
1181 | |
1182 | auto it = m_inflight_sync_reqs.find(sync_id); | |
1183 | if (it == m_inflight_sync_reqs.end()) { | |
11fdf7f2 | 1184 | dout(5) << "not found" << dendl; |
31f18b77 FG |
1185 | m_work_queue->queue(on_finish, 0); |
1186 | return; | |
1187 | } | |
1188 | ||
1189 | auto sync_ctx = it->second; | |
1190 | ||
1191 | if (sync_ctx->on_complete != nullptr) { | |
11fdf7f2 | 1192 | dout(5) << "duplicate request" << dendl; |
31f18b77 FG |
1193 | m_work_queue->queue(sync_ctx->on_complete, -ESTALE); |
1194 | } | |
1195 | ||
1196 | sync_ctx->on_complete = on_finish; | |
7c673cae FG |
1197 | } |
1198 | ||
1199 | template <typename I> | |
1200 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1201 | const ImageAcquirePayload &payload, | |
1202 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1203 | dout(10) << "image_acquire: instance_id=" << instance_id << ", " |
7c673cae FG |
1204 | << "request_id=" << payload.request_id << dendl; |
1205 | ||
1206 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1207 | on_notify_ack); | |
1208 | if (on_finish != nullptr) { | |
d2e6a577 | 1209 | handle_image_acquire(payload.global_image_id, on_finish); |
7c673cae FG |
1210 | } |
1211 | } | |
1212 | ||
1213 | template <typename I> | |
1214 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1215 | const ImageReleasePayload &payload, | |
1216 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1217 | dout(10) << "image_release: instance_id=" << instance_id << ", " |
7c673cae FG |
1218 | << "request_id=" << payload.request_id << dendl; |
1219 | ||
1220 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1221 | on_notify_ack); | |
1222 | if (on_finish != nullptr) { | |
d2e6a577 FG |
1223 | handle_image_release(payload.global_image_id, on_finish); |
1224 | } | |
1225 | } | |
1226 | ||
1227 | template <typename I> | |
1228 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1229 | const PeerImageRemovedPayload &payload, | |
1230 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1231 | dout(10) << "remove_peer_image: instance_id=" << instance_id << ", " |
d2e6a577 FG |
1232 | << "request_id=" << payload.request_id << dendl; |
1233 | ||
1234 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1235 | on_notify_ack); | |
1236 | if (on_finish != nullptr) { | |
1237 | handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid, | |
1238 | on_finish); | |
7c673cae FG |
1239 | } |
1240 | } | |
1241 | ||
31f18b77 FG |
1242 | template <typename I> |
1243 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1244 | const SyncRequestPayload &payload, | |
1245 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1246 | dout(10) << "sync_request: instance_id=" << instance_id << ", " |
31f18b77 FG |
1247 | << "request_id=" << payload.request_id << dendl; |
1248 | ||
1249 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1250 | on_notify_ack); | |
1251 | if (on_finish == nullptr) { | |
1252 | return; | |
1253 | } | |
1254 | ||
1255 | handle_sync_request(instance_id, payload.sync_id, on_finish); | |
1256 | } | |
1257 | ||
1258 | template <typename I> | |
1259 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1260 | const SyncStartPayload &payload, | |
1261 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1262 | dout(10) << "sync_start: instance_id=" << instance_id << ", " |
31f18b77 FG |
1263 | << "request_id=" << payload.request_id << dendl; |
1264 | ||
1265 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1266 | on_notify_ack); | |
1267 | if (on_finish == nullptr) { | |
1268 | return; | |
1269 | } | |
1270 | ||
1271 | handle_sync_start(instance_id, payload.sync_id, on_finish); | |
1272 | } | |
1273 | ||
7c673cae FG |
1274 | template <typename I> |
1275 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1276 | const UnknownPayload &payload, | |
1277 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1278 | dout(5) << "unknown: instance_id=" << instance_id << dendl; |
7c673cae FG |
1279 | |
1280 | on_notify_ack->complete(0); | |
1281 | } | |
1282 | ||
1283 | } // namespace mirror | |
1284 | } // namespace rbd | |
1285 | ||
1286 | template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>; |