]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "InstanceWatcher.h" | |
7c673cae FG |
5 | #include "include/stringify.h" |
6 | #include "common/debug.h" | |
7 | #include "common/errno.h" | |
8 | #include "cls/rbd/cls_rbd_client.h" | |
f67539c2 | 9 | #include "librbd/AsioEngine.h" |
7c673cae FG |
10 | #include "librbd/ManagedLock.h" |
11 | #include "librbd/Utils.h" | |
f67539c2 | 12 | #include "librbd/asio/ContextWQ.h" |
7c673cae | 13 | #include "InstanceReplayer.h" |
9f95a23c | 14 | #include "Throttler.h" |
11fdf7f2 | 15 | #include "common/Cond.h" |
7c673cae FG |
16 | |
17 | #define dout_context g_ceph_context | |
18 | #define dout_subsys ceph_subsys_rbd_mirror | |
19 | #undef dout_prefix | |
20 | #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " | |
21 | ||
22 | namespace rbd { | |
23 | namespace mirror { | |
24 | ||
25 | using namespace instance_watcher; | |
26 | ||
27 | using librbd::util::create_async_context_callback; | |
28 | using librbd::util::create_context_callback; | |
29 | using librbd::util::create_rados_callback; | |
30 | using librbd::util::unique_lock_name; | |
31 | ||
32 | namespace { | |
33 | ||
34 | struct C_GetInstances : public Context { | |
35 | std::vector<std::string> *instance_ids; | |
36 | Context *on_finish; | |
37 | bufferlist out_bl; | |
38 | ||
39 | C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish) | |
40 | : instance_ids(instance_ids), on_finish(on_finish) { | |
41 | } | |
42 | ||
43 | void finish(int r) override { | |
11fdf7f2 | 44 | dout(10) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r |
7c673cae FG |
45 | << dendl; |
46 | ||
47 | if (r == 0) { | |
11fdf7f2 | 48 | auto it = out_bl.cbegin(); |
7c673cae FG |
49 | r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids); |
50 | } else if (r == -ENOENT) { | |
51 | r = 0; | |
52 | } | |
53 | on_finish->complete(r); | |
54 | } | |
55 | }; | |
56 | ||
57 | template <typename I> | |
58 | struct C_RemoveInstanceRequest : public Context { | |
59 | InstanceWatcher<I> instance_watcher; | |
60 | Context *on_finish; | |
61 | ||
f67539c2 TL |
62 | C_RemoveInstanceRequest(librados::IoCtx &io_ctx, |
63 | librbd::AsioEngine& asio_engine, | |
7c673cae | 64 | const std::string &instance_id, Context *on_finish) |
f67539c2 | 65 | : instance_watcher(io_ctx, asio_engine, nullptr, nullptr, instance_id), |
7c673cae FG |
66 | on_finish(on_finish) { |
67 | } | |
68 | ||
69 | void send() { | |
11fdf7f2 | 70 | dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl; |
7c673cae FG |
71 | |
72 | instance_watcher.remove(this); | |
73 | } | |
74 | ||
75 | void finish(int r) override { | |
11fdf7f2 | 76 | dout(10) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r=" |
7c673cae | 77 | << r << dendl; |
11fdf7f2 | 78 | ceph_assert(r == 0); |
7c673cae FG |
79 | |
80 | on_finish->complete(r); | |
81 | } | |
82 | }; | |
83 | ||
84 | } // anonymous namespace | |
85 | ||
86 | template <typename I> | |
87 | struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context { | |
88 | InstanceWatcher<I> *instance_watcher; | |
7c673cae FG |
89 | std::string instance_id; |
90 | uint64_t request_id; | |
91 | bufferlist bl; | |
92 | Context *on_finish; | |
31f18b77 FG |
93 | bool send_to_leader; |
94 | std::unique_ptr<librbd::watcher::Notifier> notifier; | |
7c673cae | 95 | librbd::watcher::NotifyResponse response; |
31f18b77 | 96 | bool canceling = false; |
7c673cae FG |
97 | |
98 | C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher, | |
99 | const std::string &instance_id, uint64_t request_id, | |
100 | bufferlist &&bl, Context *on_finish) | |
31f18b77 FG |
101 | : instance_watcher(instance_watcher), instance_id(instance_id), |
102 | request_id(request_id), bl(bl), on_finish(on_finish), | |
103 | send_to_leader(instance_id.empty()) { | |
11fdf7f2 | 104 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
31f18b77 FG |
105 | << ": instance_watcher=" << instance_watcher << ", instance_id=" |
106 | << instance_id << ", request_id=" << request_id << dendl; | |
107 | ||
9f95a23c | 108 | ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); |
31f18b77 FG |
109 | |
110 | if (!send_to_leader) { | |
11fdf7f2 | 111 | ceph_assert((!instance_id.empty())); |
31f18b77 FG |
112 | notifier.reset(new librbd::watcher::Notifier( |
113 | instance_watcher->m_work_queue, | |
114 | instance_watcher->m_ioctx, | |
115 | RBD_MIRROR_INSTANCE_PREFIX + instance_id)); | |
116 | } | |
117 | ||
118 | instance_watcher->m_notify_op_tracker.start_op(); | |
7c673cae FG |
119 | auto result = instance_watcher->m_notify_ops.insert( |
120 | std::make_pair(instance_id, this)).second; | |
11fdf7f2 | 121 | ceph_assert(result); |
7c673cae FG |
122 | } |
123 | ||
124 | void send() { | |
11fdf7f2 | 125 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; |
7c673cae | 126 | |
9f95a23c | 127 | ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); |
31f18b77 FG |
128 | |
129 | if (canceling) { | |
11fdf7f2 | 130 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
31f18b77 FG |
131 | << ": canceling" << dendl; |
132 | instance_watcher->m_work_queue->queue(this, -ECANCELED); | |
133 | return; | |
134 | } | |
135 | ||
136 | if (send_to_leader) { | |
137 | if (instance_watcher->m_leader_instance_id.empty()) { | |
11fdf7f2 | 138 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
31f18b77 FG |
139 | << ": suspending" << dendl; |
140 | instance_watcher->suspend_notify_request(this); | |
141 | return; | |
142 | } | |
143 | ||
144 | if (instance_watcher->m_leader_instance_id != instance_id) { | |
145 | auto count = instance_watcher->m_notify_ops.erase( | |
146 | std::make_pair(instance_id, this)); | |
11fdf7f2 | 147 | ceph_assert(count > 0); |
31f18b77 FG |
148 | |
149 | instance_id = instance_watcher->m_leader_instance_id; | |
150 | ||
151 | auto result = instance_watcher->m_notify_ops.insert( | |
152 | std::make_pair(instance_id, this)).second; | |
11fdf7f2 | 153 | ceph_assert(result); |
31f18b77 FG |
154 | |
155 | notifier.reset(new librbd::watcher::Notifier( | |
156 | instance_watcher->m_work_queue, | |
157 | instance_watcher->m_ioctx, | |
158 | RBD_MIRROR_INSTANCE_PREFIX + instance_id)); | |
159 | } | |
160 | } | |
161 | ||
11fdf7f2 | 162 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
d2e6a577 | 163 | << ": sending to " << instance_id << dendl; |
31f18b77 | 164 | notifier->notify(bl, &response, this); |
7c673cae FG |
165 | } |
166 | ||
167 | void cancel() { | |
11fdf7f2 | 168 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; |
7c673cae | 169 | |
9f95a23c | 170 | ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); |
31f18b77 FG |
171 | |
172 | canceling = true; | |
173 | instance_watcher->unsuspend_notify_request(this); | |
7c673cae FG |
174 | } |
175 | ||
176 | void finish(int r) override { | |
11fdf7f2 | 177 | dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r=" |
7c673cae FG |
178 | << r << dendl; |
179 | ||
180 | if (r == 0 || r == -ETIMEDOUT) { | |
181 | bool found = false; | |
182 | for (auto &it : response.acks) { | |
183 | auto &bl = it.second; | |
184 | if (it.second.length() == 0) { | |
11fdf7f2 TL |
185 | dout(5) << "C_NotifyInstanceRequest: " << this << " " << __func__ |
186 | << ": no payload in ack, ignoring" << dendl; | |
7c673cae FG |
187 | continue; |
188 | } | |
189 | try { | |
11fdf7f2 | 190 | auto iter = bl.cbegin(); |
7c673cae | 191 | NotifyAckPayload ack; |
11fdf7f2 | 192 | decode(ack, iter); |
7c673cae FG |
193 | if (ack.instance_id != instance_watcher->get_instance_id()) { |
194 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
195 | << ": ack instance_id (" << ack.instance_id << ") " | |
196 | << "does not match, ignoring" << dendl; | |
197 | continue; | |
198 | } | |
199 | if (ack.request_id != request_id) { | |
200 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
201 | << ": ack request_id (" << ack.request_id << ") " | |
202 | << "does not match, ignoring" << dendl; | |
203 | continue; | |
204 | } | |
205 | r = ack.ret_val; | |
206 | found = true; | |
207 | break; | |
208 | } catch (const buffer::error &err) { | |
209 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
210 | << ": failed to decode ack: " << err.what() << dendl; | |
211 | continue; | |
212 | } | |
213 | } | |
214 | ||
215 | if (!found) { | |
216 | if (r == -ETIMEDOUT) { | |
31f18b77 FG |
217 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ |
218 | << ": resending after timeout" << dendl; | |
9f95a23c | 219 | std::lock_guard locker{instance_watcher->m_lock}; |
31f18b77 FG |
220 | send(); |
221 | return; | |
7c673cae FG |
222 | } else { |
223 | r = -EINVAL; | |
224 | } | |
31f18b77 FG |
225 | } else { |
226 | if (r == -ESTALE && send_to_leader) { | |
227 | derr << "C_NotifyInstanceRequest: " << this << " " << __func__ | |
228 | << ": resending due to leader change" << dendl; | |
9f95a23c | 229 | std::lock_guard locker{instance_watcher->m_lock}; |
31f18b77 FG |
230 | send(); |
231 | return; | |
232 | } | |
7c673cae FG |
233 | } |
234 | } | |
235 | ||
7c673cae FG |
236 | on_finish->complete(r); |
237 | ||
31f18b77 | 238 | { |
9f95a23c | 239 | std::lock_guard locker{instance_watcher->m_lock}; |
31f18b77 | 240 | auto result = instance_watcher->m_notify_ops.erase( |
7c673cae | 241 | std::make_pair(instance_id, this)); |
11fdf7f2 | 242 | ceph_assert(result > 0); |
31f18b77 FG |
243 | instance_watcher->m_notify_op_tracker.finish_op(); |
244 | } | |
245 | ||
7c673cae FG |
246 | delete this; |
247 | } | |
248 | ||
249 | void complete(int r) override { | |
250 | finish(r); | |
251 | } | |
252 | }; | |
253 | ||
31f18b77 FG |
254 | template <typename I> |
255 | struct InstanceWatcher<I>::C_SyncRequest : public Context { | |
256 | InstanceWatcher<I> *instance_watcher; | |
257 | std::string sync_id; | |
258 | Context *on_start; | |
259 | Context *on_complete = nullptr; | |
260 | C_NotifyInstanceRequest *req = nullptr; | |
261 | ||
262 | C_SyncRequest(InstanceWatcher<I> *instance_watcher, | |
263 | const std::string &sync_id, Context *on_start) | |
264 | : instance_watcher(instance_watcher), sync_id(sync_id), | |
265 | on_start(on_start) { | |
11fdf7f2 | 266 | dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id=" |
31f18b77 FG |
267 | << sync_id << dendl; |
268 | } | |
269 | ||
270 | void finish(int r) override { | |
11fdf7f2 | 271 | dout(10) << "C_SyncRequest: " << this << " " << __func__ << ": r=" |
31f18b77 FG |
272 | << r << dendl; |
273 | ||
274 | if (on_start != nullptr) { | |
275 | instance_watcher->handle_notify_sync_request(this, r); | |
276 | } else { | |
277 | instance_watcher->handle_notify_sync_complete(this, r); | |
278 | delete this; | |
279 | } | |
280 | } | |
281 | ||
282 | // called twice | |
283 | void complete(int r) override { | |
284 | finish(r); | |
285 | } | |
286 | }; | |
287 | ||
7c673cae FG |
288 | #undef dout_prefix |
289 | #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ | |
290 | << this << " " << __func__ << ": " | |
291 | template <typename I> | |
292 | void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx, | |
293 | std::vector<std::string> *instance_ids, | |
294 | Context *on_finish) { | |
295 | librados::ObjectReadOperation op; | |
296 | librbd::cls_client::mirror_instances_list_start(&op); | |
297 | C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish); | |
298 | librados::AioCompletion *aio_comp = create_rados_callback(ctx); | |
299 | ||
300 | int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl); | |
11fdf7f2 | 301 | ceph_assert(r == 0); |
7c673cae FG |
302 | aio_comp->release(); |
303 | } | |
304 | ||
305 | template <typename I> | |
306 | void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx, | |
f67539c2 | 307 | librbd::AsioEngine& asio_engine, |
7c673cae FG |
308 | const std::string &instance_id, |
309 | Context *on_finish) { | |
f67539c2 | 310 | auto req = new C_RemoveInstanceRequest<I>(io_ctx, asio_engine, instance_id, |
7c673cae FG |
311 | on_finish); |
312 | req->send(); | |
313 | } | |
314 | ||
315 | template <typename I> | |
316 | InstanceWatcher<I> *InstanceWatcher<I>::create( | |
f67539c2 | 317 | librados::IoCtx &io_ctx, librbd::AsioEngine& asio_engine, |
9f95a23c TL |
318 | InstanceReplayer<I> *instance_replayer, |
319 | Throttler<I> *image_sync_throttler) { | |
f67539c2 | 320 | return new InstanceWatcher<I>(io_ctx, asio_engine, instance_replayer, |
9f95a23c | 321 | image_sync_throttler, |
7c673cae FG |
322 | stringify(io_ctx.get_instance_id())); |
323 | } | |
324 | ||
325 | template <typename I> | |
326 | InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx, | |
f67539c2 | 327 | librbd::AsioEngine& asio_engine, |
7c673cae | 328 | InstanceReplayer<I> *instance_replayer, |
9f95a23c | 329 | Throttler<I> *image_sync_throttler, |
7c673cae | 330 | const std::string &instance_id) |
f67539c2 TL |
331 | : Watcher(io_ctx, asio_engine.get_work_queue(), |
332 | RBD_MIRROR_INSTANCE_PREFIX + instance_id), | |
9f95a23c TL |
333 | m_instance_replayer(instance_replayer), |
334 | m_image_sync_throttler(image_sync_throttler), m_instance_id(instance_id), | |
335 | m_lock(ceph::make_mutex( | |
336 | unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this))), | |
7c673cae | 337 | m_instance_lock(librbd::ManagedLock<I>::create( |
f67539c2 TL |
338 | m_ioctx, asio_engine, m_oid, this, librbd::managed_lock::EXCLUSIVE, true, |
339 | m_cct->_conf.get_val<uint64_t>("rbd_blocklist_expire_seconds"))) { | |
7c673cae FG |
340 | } |
341 | ||
342 | template <typename I> | |
343 | InstanceWatcher<I>::~InstanceWatcher() { | |
11fdf7f2 TL |
344 | ceph_assert(m_requests.empty()); |
345 | ceph_assert(m_notify_ops.empty()); | |
346 | ceph_assert(m_notify_op_tracker.empty()); | |
347 | ceph_assert(m_suspended_ops.empty()); | |
348 | ceph_assert(m_inflight_sync_reqs.empty()); | |
7c673cae FG |
349 | m_instance_lock->destroy(); |
350 | } | |
351 | ||
352 | template <typename I> | |
353 | int InstanceWatcher<I>::init() { | |
354 | C_SaferCond init_ctx; | |
355 | init(&init_ctx); | |
356 | return init_ctx.wait(); | |
357 | } | |
358 | ||
359 | template <typename I> | |
360 | void InstanceWatcher<I>::init(Context *on_finish) { | |
11fdf7f2 | 361 | dout(10) << "instance_id=" << m_instance_id << dendl; |
7c673cae | 362 | |
9f95a23c | 363 | std::lock_guard locker{m_lock}; |
7c673cae | 364 | |
11fdf7f2 | 365 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
366 | m_on_finish = on_finish; |
367 | m_ret_val = 0; | |
368 | ||
369 | register_instance(); | |
370 | } | |
371 | ||
372 | template <typename I> | |
373 | void InstanceWatcher<I>::shut_down() { | |
374 | C_SaferCond shut_down_ctx; | |
375 | shut_down(&shut_down_ctx); | |
376 | int r = shut_down_ctx.wait(); | |
11fdf7f2 | 377 | ceph_assert(r == 0); |
7c673cae FG |
378 | } |
379 | ||
380 | template <typename I> | |
381 | void InstanceWatcher<I>::shut_down(Context *on_finish) { | |
11fdf7f2 | 382 | dout(10) << dendl; |
7c673cae | 383 | |
9f95a23c | 384 | std::lock_guard locker{m_lock}; |
7c673cae | 385 | |
11fdf7f2 | 386 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
387 | m_on_finish = on_finish; |
388 | m_ret_val = 0; | |
389 | ||
390 | release_lock(); | |
391 | } | |
392 | ||
393 | template <typename I> | |
394 | void InstanceWatcher<I>::remove(Context *on_finish) { | |
11fdf7f2 | 395 | dout(10) << dendl; |
7c673cae | 396 | |
9f95a23c | 397 | std::lock_guard locker{m_lock}; |
7c673cae | 398 | |
11fdf7f2 | 399 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
400 | m_on_finish = on_finish; |
401 | m_ret_val = 0; | |
7c673cae FG |
402 | |
403 | get_instance_locker(); | |
404 | } | |
405 | ||
406 | template <typename I> | |
407 | void InstanceWatcher<I>::notify_image_acquire( | |
408 | const std::string &instance_id, const std::string &global_image_id, | |
d2e6a577 | 409 | Context *on_notify_ack) { |
11fdf7f2 | 410 | dout(10) << "instance_id=" << instance_id << ", global_image_id=" |
7c673cae FG |
411 | << global_image_id << dendl; |
412 | ||
9f95a23c | 413 | std::lock_guard locker{m_lock}; |
7c673cae | 414 | |
11fdf7f2 | 415 | ceph_assert(m_on_finish == nullptr); |
7c673cae | 416 | |
11fdf7f2 TL |
417 | uint64_t request_id = ++m_request_seq; |
418 | bufferlist bl; | |
419 | encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}}, bl); | |
420 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
421 | std::move(bl), on_notify_ack); | |
422 | req->send(); | |
7c673cae FG |
423 | } |
424 | ||
425 | template <typename I> | |
426 | void InstanceWatcher<I>::notify_image_release( | |
d2e6a577 FG |
427 | const std::string &instance_id, const std::string &global_image_id, |
428 | Context *on_notify_ack) { | |
11fdf7f2 | 429 | dout(10) << "instance_id=" << instance_id << ", global_image_id=" |
7c673cae FG |
430 | << global_image_id << dendl; |
431 | ||
9f95a23c | 432 | std::lock_guard locker{m_lock}; |
7c673cae | 433 | |
11fdf7f2 | 434 | ceph_assert(m_on_finish == nullptr); |
7c673cae | 435 | |
11fdf7f2 TL |
436 | uint64_t request_id = ++m_request_seq; |
437 | bufferlist bl; | |
438 | encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}}, bl); | |
439 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
440 | std::move(bl), on_notify_ack); | |
441 | req->send(); | |
d2e6a577 FG |
442 | } |
443 | ||
444 | template <typename I> | |
445 | void InstanceWatcher<I>::notify_peer_image_removed( | |
446 | const std::string &instance_id, const std::string &global_image_id, | |
447 | const std::string &peer_mirror_uuid, Context *on_notify_ack) { | |
11fdf7f2 | 448 | dout(10) << "instance_id=" << instance_id << ", " |
d2e6a577 FG |
449 | << "global_image_id=" << global_image_id << ", " |
450 | << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; | |
451 | ||
9f95a23c | 452 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 453 | ceph_assert(m_on_finish == nullptr); |
d2e6a577 | 454 | |
11fdf7f2 TL |
455 | uint64_t request_id = ++m_request_seq; |
456 | bufferlist bl; | |
457 | encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id, | |
458 | peer_mirror_uuid}}, bl); | |
459 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
460 | std::move(bl), on_notify_ack); | |
461 | req->send(); | |
7c673cae FG |
462 | } |
463 | ||
31f18b77 FG |
464 | template <typename I> |
465 | void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id, | |
466 | Context *on_sync_start) { | |
11fdf7f2 | 467 | dout(10) << "sync_id=" << sync_id << dendl; |
31f18b77 | 468 | |
9f95a23c | 469 | std::lock_guard locker{m_lock}; |
31f18b77 | 470 | |
11fdf7f2 | 471 | ceph_assert(m_inflight_sync_reqs.count(sync_id) == 0); |
31f18b77 FG |
472 | |
473 | uint64_t request_id = ++m_request_seq; | |
474 | ||
475 | bufferlist bl; | |
11fdf7f2 | 476 | encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl); |
31f18b77 FG |
477 | |
478 | auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start); | |
479 | sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id, | |
480 | std::move(bl), sync_ctx); | |
481 | ||
482 | m_inflight_sync_reqs[sync_id] = sync_ctx; | |
483 | sync_ctx->req->send(); | |
484 | } | |
485 | ||
486 | template <typename I> | |
487 | bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) { | |
11fdf7f2 | 488 | dout(10) << "sync_id=" << sync_id << dendl; |
31f18b77 | 489 | |
9f95a23c | 490 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
491 | |
492 | auto it = m_inflight_sync_reqs.find(sync_id); | |
493 | if (it == m_inflight_sync_reqs.end()) { | |
494 | return false; | |
495 | } | |
496 | ||
497 | auto sync_ctx = it->second; | |
498 | ||
499 | if (sync_ctx->on_start == nullptr) { | |
500 | return false; | |
501 | } | |
502 | ||
11fdf7f2 | 503 | ceph_assert(sync_ctx->req != nullptr); |
31f18b77 FG |
504 | sync_ctx->req->cancel(); |
505 | return true; | |
506 | } | |
507 | ||
508 | template <typename I> | |
509 | void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id, | |
510 | const std::string &sync_id) { | |
11fdf7f2 | 511 | dout(10) << "sync_id=" << sync_id << dendl; |
31f18b77 | 512 | |
9f95a23c | 513 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
514 | |
515 | uint64_t request_id = ++m_request_seq; | |
516 | ||
517 | bufferlist bl; | |
11fdf7f2 | 518 | encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl); |
31f18b77 | 519 | |
9f95a23c | 520 | auto ctx = new LambdaContext( |
31f18b77 | 521 | [this, sync_id] (int r) { |
11fdf7f2 | 522 | dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl; |
9f95a23c TL |
523 | std::lock_guard locker{m_lock}; |
524 | if (r != -ESTALE && is_leader()) { | |
525 | m_image_sync_throttler->finish_op(m_ioctx.get_namespace(), sync_id); | |
31f18b77 FG |
526 | } |
527 | }); | |
528 | auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, | |
529 | std::move(bl), ctx); | |
530 | req->send(); | |
531 | } | |
532 | ||
533 | template <typename I> | |
534 | void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) { | |
9f95a23c | 535 | std::lock_guard locker{m_lock}; |
28e407b8 AA |
536 | notify_sync_complete(m_lock, sync_id); |
537 | } | |
538 | ||
539 | template <typename I> | |
9f95a23c | 540 | void InstanceWatcher<I>::notify_sync_complete(const ceph::mutex&, |
28e407b8 AA |
541 | const std::string &sync_id) { |
542 | dout(10) << "sync_id=" << sync_id << dendl; | |
9f95a23c | 543 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
544 | |
545 | auto it = m_inflight_sync_reqs.find(sync_id); | |
11fdf7f2 | 546 | ceph_assert(it != m_inflight_sync_reqs.end()); |
31f18b77 FG |
547 | |
548 | auto sync_ctx = it->second; | |
11fdf7f2 | 549 | ceph_assert(sync_ctx->req == nullptr); |
31f18b77 FG |
550 | |
551 | m_inflight_sync_reqs.erase(it); | |
552 | m_work_queue->queue(sync_ctx, 0); | |
553 | } | |
554 | ||
555 | template <typename I> | |
556 | void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx, | |
557 | int r) { | |
11fdf7f2 | 558 | dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; |
31f18b77 FG |
559 | |
560 | Context *on_start = nullptr; | |
561 | { | |
9f95a23c | 562 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
563 | ceph_assert(sync_ctx->req != nullptr); |
564 | ceph_assert(sync_ctx->on_start != nullptr); | |
31f18b77 FG |
565 | |
566 | if (sync_ctx->req->canceling) { | |
567 | r = -ECANCELED; | |
568 | } | |
569 | ||
570 | std::swap(sync_ctx->on_start, on_start); | |
571 | sync_ctx->req = nullptr; | |
28e407b8 AA |
572 | |
573 | if (r == -ECANCELED) { | |
574 | notify_sync_complete(m_lock, sync_ctx->sync_id); | |
575 | } | |
31f18b77 FG |
576 | } |
577 | ||
578 | on_start->complete(r == -ECANCELED ? r : 0); | |
31f18b77 FG |
579 | } |
580 | ||
581 | template <typename I> | |
582 | void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx, | |
583 | int r) { | |
11fdf7f2 | 584 | dout(10) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; |
31f18b77 FG |
585 | |
586 | if (sync_ctx->on_complete != nullptr) { | |
587 | sync_ctx->on_complete->complete(r); | |
588 | } | |
589 | } | |
590 | ||
31f18b77 FG |
591 | template <typename I> |
592 | void InstanceWatcher<I>::handle_acquire_leader() { | |
11fdf7f2 | 593 | dout(10) << dendl; |
31f18b77 | 594 | |
9f95a23c | 595 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
596 | |
597 | m_leader_instance_id = m_instance_id; | |
598 | unsuspend_notify_requests(); | |
599 | } | |
600 | ||
601 | template <typename I> | |
602 | void InstanceWatcher<I>::handle_release_leader() { | |
11fdf7f2 | 603 | dout(10) << dendl; |
31f18b77 | 604 | |
9f95a23c | 605 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
606 | |
607 | m_leader_instance_id.clear(); | |
608 | ||
9f95a23c | 609 | m_image_sync_throttler->drain(m_ioctx.get_namespace(), -ESTALE); |
31f18b77 FG |
610 | } |
611 | ||
612 | template <typename I> | |
613 | void InstanceWatcher<I>::handle_update_leader( | |
614 | const std::string &leader_instance_id) { | |
11fdf7f2 | 615 | dout(10) << "leader_instance_id=" << leader_instance_id << dendl; |
31f18b77 | 616 | |
9f95a23c | 617 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
618 | |
619 | m_leader_instance_id = leader_instance_id; | |
620 | ||
621 | if (!m_leader_instance_id.empty()) { | |
622 | unsuspend_notify_requests(); | |
623 | } | |
624 | } | |
625 | ||
7c673cae FG |
626 | template <typename I> |
627 | void InstanceWatcher<I>::cancel_notify_requests( | |
628 | const std::string &instance_id) { | |
11fdf7f2 | 629 | dout(10) << "instance_id=" << instance_id << dendl; |
7c673cae | 630 | |
9f95a23c | 631 | std::lock_guard locker{m_lock}; |
7c673cae FG |
632 | |
633 | for (auto op : m_notify_ops) { | |
31f18b77 | 634 | if (op.first == instance_id && !op.second->send_to_leader) { |
7c673cae FG |
635 | op.second->cancel(); |
636 | } | |
637 | } | |
638 | } | |
639 | ||
7c673cae FG |
640 | template <typename I> |
641 | void InstanceWatcher<I>::register_instance() { | |
9f95a23c | 642 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 643 | |
11fdf7f2 | 644 | dout(10) << dendl; |
7c673cae FG |
645 | |
646 | librados::ObjectWriteOperation op; | |
647 | librbd::cls_client::mirror_instances_add(&op, m_instance_id); | |
648 | librados::AioCompletion *aio_comp = create_rados_callback< | |
649 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this); | |
650 | ||
651 | int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); | |
11fdf7f2 | 652 | ceph_assert(r == 0); |
7c673cae FG |
653 | aio_comp->release(); |
654 | } | |
655 | ||
656 | template <typename I> | |
657 | void InstanceWatcher<I>::handle_register_instance(int r) { | |
11fdf7f2 | 658 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
659 | |
660 | Context *on_finish = nullptr; | |
661 | { | |
9f95a23c | 662 | std::lock_guard locker{m_lock}; |
7c673cae FG |
663 | |
664 | if (r == 0) { | |
665 | create_instance_object(); | |
666 | return; | |
667 | } | |
668 | ||
669 | derr << "error registering instance: " << cpp_strerror(r) << dendl; | |
670 | ||
671 | std::swap(on_finish, m_on_finish); | |
672 | } | |
673 | on_finish->complete(r); | |
674 | } | |
675 | ||
676 | ||
677 | template <typename I> | |
678 | void InstanceWatcher<I>::create_instance_object() { | |
11fdf7f2 | 679 | dout(10) << dendl; |
7c673cae | 680 | |
9f95a23c | 681 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
682 | |
683 | librados::ObjectWriteOperation op; | |
684 | op.create(true); | |
685 | ||
686 | librados::AioCompletion *aio_comp = create_rados_callback< | |
687 | InstanceWatcher<I>, | |
688 | &InstanceWatcher<I>::handle_create_instance_object>(this); | |
689 | int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); | |
11fdf7f2 | 690 | ceph_assert(r == 0); |
7c673cae FG |
691 | aio_comp->release(); |
692 | } | |
693 | ||
694 | template <typename I> | |
695 | void InstanceWatcher<I>::handle_create_instance_object(int r) { | |
11fdf7f2 | 696 | dout(10) << "r=" << r << dendl; |
7c673cae | 697 | |
9f95a23c | 698 | std::lock_guard locker{m_lock}; |
7c673cae FG |
699 | |
700 | if (r < 0) { | |
701 | derr << "error creating " << m_oid << " object: " << cpp_strerror(r) | |
702 | << dendl; | |
703 | ||
704 | m_ret_val = r; | |
705 | unregister_instance(); | |
706 | return; | |
707 | } | |
708 | ||
709 | register_watch(); | |
710 | } | |
711 | ||
712 | template <typename I> | |
713 | void InstanceWatcher<I>::register_watch() { | |
11fdf7f2 | 714 | dout(10) << dendl; |
7c673cae | 715 | |
9f95a23c | 716 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
717 | |
718 | Context *ctx = create_async_context_callback( | |
719 | m_work_queue, create_context_callback< | |
720 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this)); | |
721 | ||
722 | librbd::Watcher::register_watch(ctx); | |
723 | } | |
724 | ||
725 | template <typename I> | |
726 | void InstanceWatcher<I>::handle_register_watch(int r) { | |
11fdf7f2 | 727 | dout(10) << "r=" << r << dendl; |
7c673cae | 728 | |
9f95a23c | 729 | std::lock_guard locker{m_lock}; |
7c673cae FG |
730 | |
731 | if (r < 0) { | |
732 | derr << "error registering instance watcher for " << m_oid << " object: " | |
733 | << cpp_strerror(r) << dendl; | |
734 | ||
735 | m_ret_val = r; | |
736 | remove_instance_object(); | |
737 | return; | |
738 | } | |
739 | ||
740 | acquire_lock(); | |
741 | } | |
742 | ||
743 | template <typename I> | |
744 | void InstanceWatcher<I>::acquire_lock() { | |
11fdf7f2 | 745 | dout(10) << dendl; |
7c673cae | 746 | |
9f95a23c | 747 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
748 | |
749 | Context *ctx = create_async_context_callback( | |
750 | m_work_queue, create_context_callback< | |
751 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this)); | |
752 | ||
753 | m_instance_lock->acquire_lock(ctx); | |
754 | } | |
755 | ||
756 | template <typename I> | |
757 | void InstanceWatcher<I>::handle_acquire_lock(int r) { | |
11fdf7f2 | 758 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
759 | |
760 | Context *on_finish = nullptr; | |
761 | { | |
9f95a23c | 762 | std::lock_guard locker{m_lock}; |
7c673cae FG |
763 | |
764 | if (r < 0) { | |
765 | ||
766 | derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl; | |
767 | ||
768 | m_ret_val = r; | |
769 | unregister_watch(); | |
770 | return; | |
771 | } | |
772 | ||
773 | std::swap(on_finish, m_on_finish); | |
774 | } | |
775 | ||
776 | on_finish->complete(r); | |
777 | } | |
778 | ||
779 | template <typename I> | |
780 | void InstanceWatcher<I>::release_lock() { | |
11fdf7f2 | 781 | dout(10) << dendl; |
7c673cae | 782 | |
9f95a23c | 783 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
784 | |
785 | Context *ctx = create_async_context_callback( | |
786 | m_work_queue, create_context_callback< | |
787 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this)); | |
788 | ||
789 | m_instance_lock->shut_down(ctx); | |
790 | } | |
791 | ||
792 | template <typename I> | |
793 | void InstanceWatcher<I>::handle_release_lock(int r) { | |
11fdf7f2 | 794 | dout(10) << "r=" << r << dendl; |
7c673cae | 795 | |
9f95a23c | 796 | std::lock_guard locker{m_lock}; |
7c673cae FG |
797 | |
798 | if (r < 0) { | |
799 | derr << "error releasing instance lock: " << cpp_strerror(r) << dendl; | |
800 | } | |
801 | ||
802 | unregister_watch(); | |
803 | } | |
804 | ||
805 | template <typename I> | |
806 | void InstanceWatcher<I>::unregister_watch() { | |
11fdf7f2 | 807 | dout(10) << dendl; |
7c673cae | 808 | |
9f95a23c | 809 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
810 | |
811 | Context *ctx = create_async_context_callback( | |
812 | m_work_queue, create_context_callback< | |
813 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this)); | |
814 | ||
815 | librbd::Watcher::unregister_watch(ctx); | |
816 | } | |
817 | ||
818 | template <typename I> | |
819 | void InstanceWatcher<I>::handle_unregister_watch(int r) { | |
11fdf7f2 | 820 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
821 | |
822 | if (r < 0) { | |
823 | derr << "error unregistering instance watcher for " << m_oid << " object: " | |
824 | << cpp_strerror(r) << dendl; | |
825 | } | |
826 | ||
9f95a23c | 827 | std::lock_guard locker{m_lock}; |
7c673cae FG |
828 | remove_instance_object(); |
829 | } | |
830 | ||
831 | template <typename I> | |
832 | void InstanceWatcher<I>::remove_instance_object() { | |
9f95a23c | 833 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 834 | |
11fdf7f2 | 835 | dout(10) << dendl; |
7c673cae FG |
836 | |
837 | librados::ObjectWriteOperation op; | |
838 | op.remove(); | |
839 | ||
840 | librados::AioCompletion *aio_comp = create_rados_callback< | |
841 | InstanceWatcher<I>, | |
842 | &InstanceWatcher<I>::handle_remove_instance_object>(this); | |
843 | int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); | |
11fdf7f2 | 844 | ceph_assert(r == 0); |
7c673cae FG |
845 | aio_comp->release(); |
846 | } | |
847 | ||
848 | template <typename I> | |
849 | void InstanceWatcher<I>::handle_remove_instance_object(int r) { | |
11fdf7f2 | 850 | dout(10) << "r=" << r << dendl; |
7c673cae | 851 | |
11fdf7f2 | 852 | if (r == -ENOENT) { |
7c673cae FG |
853 | r = 0; |
854 | } | |
855 | ||
856 | if (r < 0) { | |
857 | derr << "error removing " << m_oid << " object: " << cpp_strerror(r) | |
858 | << dendl; | |
859 | } | |
860 | ||
9f95a23c | 861 | std::lock_guard locker{m_lock}; |
7c673cae FG |
862 | unregister_instance(); |
863 | } | |
864 | ||
865 | template <typename I> | |
866 | void InstanceWatcher<I>::unregister_instance() { | |
11fdf7f2 | 867 | dout(10) << dendl; |
7c673cae | 868 | |
9f95a23c | 869 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
870 | |
871 | librados::ObjectWriteOperation op; | |
872 | librbd::cls_client::mirror_instances_remove(&op, m_instance_id); | |
873 | librados::AioCompletion *aio_comp = create_rados_callback< | |
874 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this); | |
875 | ||
876 | int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); | |
11fdf7f2 | 877 | ceph_assert(r == 0); |
7c673cae FG |
878 | aio_comp->release(); |
879 | } | |
880 | ||
881 | template <typename I> | |
882 | void InstanceWatcher<I>::handle_unregister_instance(int r) { | |
11fdf7f2 | 883 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
884 | |
885 | if (r < 0) { | |
886 | derr << "error unregistering instance: " << cpp_strerror(r) << dendl; | |
887 | } | |
888 | ||
9f95a23c | 889 | std::lock_guard locker{m_lock}; |
7c673cae FG |
890 | wait_for_notify_ops(); |
891 | } | |
892 | ||
893 | template <typename I> | |
894 | void InstanceWatcher<I>::wait_for_notify_ops() { | |
11fdf7f2 | 895 | dout(10) << dendl; |
7c673cae | 896 | |
9f95a23c | 897 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
898 | |
899 | for (auto op : m_notify_ops) { | |
900 | op.second->cancel(); | |
901 | } | |
902 | ||
903 | Context *ctx = create_async_context_callback( | |
904 | m_work_queue, create_context_callback< | |
905 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this)); | |
906 | ||
907 | m_notify_op_tracker.wait_for_ops(ctx); | |
908 | } | |
909 | ||
910 | template <typename I> | |
911 | void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) { | |
11fdf7f2 | 912 | dout(10) << "r=" << r << dendl; |
7c673cae | 913 | |
11fdf7f2 | 914 | ceph_assert(r == 0); |
7c673cae FG |
915 | |
916 | Context *on_finish = nullptr; | |
917 | { | |
9f95a23c | 918 | std::lock_guard locker{m_lock}; |
7c673cae | 919 | |
11fdf7f2 | 920 | ceph_assert(m_notify_ops.empty()); |
7c673cae FG |
921 | |
922 | std::swap(on_finish, m_on_finish); | |
923 | r = m_ret_val; | |
7c673cae FG |
924 | } |
925 | on_finish->complete(r); | |
926 | } | |
927 | ||
928 | template <typename I> | |
929 | void InstanceWatcher<I>::get_instance_locker() { | |
11fdf7f2 | 930 | dout(10) << dendl; |
7c673cae | 931 | |
9f95a23c | 932 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
933 | |
934 | Context *ctx = create_async_context_callback( | |
935 | m_work_queue, create_context_callback< | |
936 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this)); | |
937 | ||
938 | m_instance_lock->get_locker(&m_instance_locker, ctx); | |
939 | } | |
940 | ||
941 | template <typename I> | |
942 | void InstanceWatcher<I>::handle_get_instance_locker(int r) { | |
11fdf7f2 | 943 | dout(10) << "r=" << r << dendl; |
7c673cae | 944 | |
9f95a23c | 945 | std::lock_guard locker{m_lock}; |
7c673cae FG |
946 | |
947 | if (r < 0) { | |
948 | if (r != -ENOENT) { | |
949 | derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl; | |
950 | } | |
951 | remove_instance_object(); | |
952 | return; | |
953 | } | |
954 | ||
955 | break_instance_lock(); | |
956 | } | |
957 | ||
958 | template <typename I> | |
959 | void InstanceWatcher<I>::break_instance_lock() { | |
11fdf7f2 | 960 | dout(10) << dendl; |
7c673cae | 961 | |
9f95a23c | 962 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
963 | |
964 | Context *ctx = create_async_context_callback( | |
965 | m_work_queue, create_context_callback< | |
966 | InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this)); | |
967 | ||
968 | m_instance_lock->break_lock(m_instance_locker, true, ctx); | |
969 | } | |
970 | ||
971 | template <typename I> | |
972 | void InstanceWatcher<I>::handle_break_instance_lock(int r) { | |
11fdf7f2 | 973 | dout(10) << "r=" << r << dendl; |
7c673cae | 974 | |
9f95a23c | 975 | std::lock_guard locker{m_lock}; |
7c673cae FG |
976 | |
977 | if (r < 0) { | |
978 | if (r != -ENOENT) { | |
979 | derr << "error breaking instance lock: " << cpp_strerror(r) << dendl; | |
980 | } | |
981 | remove_instance_object(); | |
982 | return; | |
983 | } | |
984 | ||
985 | remove_instance_object(); | |
986 | } | |
987 | ||
31f18b77 FG |
988 | template <typename I> |
989 | void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) { | |
11fdf7f2 | 990 | dout(10) << req << dendl; |
31f18b77 | 991 | |
9f95a23c | 992 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
993 | |
994 | auto result = m_suspended_ops.insert(req).second; | |
11fdf7f2 | 995 | ceph_assert(result); |
31f18b77 FG |
996 | } |
997 | ||
998 | template <typename I> | |
999 | bool InstanceWatcher<I>::unsuspend_notify_request( | |
1000 | C_NotifyInstanceRequest *req) { | |
11fdf7f2 | 1001 | dout(10) << req << dendl; |
31f18b77 | 1002 | |
9f95a23c | 1003 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
1004 | |
1005 | auto result = m_suspended_ops.erase(req); | |
1006 | if (result == 0) { | |
1007 | return false; | |
1008 | } | |
1009 | ||
1010 | req->send(); | |
1011 | return true; | |
1012 | } | |
1013 | ||
1014 | template <typename I> | |
1015 | void InstanceWatcher<I>::unsuspend_notify_requests() { | |
11fdf7f2 | 1016 | dout(10) << dendl; |
31f18b77 | 1017 | |
9f95a23c | 1018 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
31f18b77 FG |
1019 | |
1020 | std::set<C_NotifyInstanceRequest *> suspended_ops; | |
1021 | std::swap(m_suspended_ops, suspended_ops); | |
1022 | ||
1023 | for (auto op : suspended_ops) { | |
1024 | op->send(); | |
1025 | } | |
1026 | } | |
1027 | ||
7c673cae FG |
1028 | template <typename I> |
1029 | Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id, | |
1030 | uint64_t request_id, | |
1031 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1032 | dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id |
7c673cae FG |
1033 | << dendl; |
1034 | ||
9f95a23c | 1035 | std::lock_guard locker{m_lock}; |
7c673cae FG |
1036 | |
1037 | Context *ctx = nullptr; | |
1038 | Request request(instance_id, request_id); | |
1039 | auto it = m_requests.find(request); | |
1040 | ||
1041 | if (it != m_requests.end()) { | |
11fdf7f2 | 1042 | dout(10) << "duplicate for in-progress request" << dendl; |
7c673cae FG |
1043 | delete it->on_notify_ack; |
1044 | m_requests.erase(it); | |
1045 | } else { | |
31f18b77 | 1046 | ctx = create_async_context_callback( |
9f95a23c | 1047 | m_work_queue, new LambdaContext( |
31f18b77 FG |
1048 | [this, instance_id, request_id] (int r) { |
1049 | complete_request(instance_id, request_id, r); | |
1050 | })); | |
7c673cae FG |
1051 | } |
1052 | ||
1053 | request.on_notify_ack = on_notify_ack; | |
1054 | m_requests.insert(request); | |
1055 | return ctx; | |
1056 | } | |
1057 | ||
31f18b77 FG |
1058 | template <typename I> |
1059 | void InstanceWatcher<I>::complete_request(const std::string &instance_id, | |
1060 | uint64_t request_id, int r) { | |
11fdf7f2 | 1061 | dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id |
31f18b77 FG |
1062 | << dendl; |
1063 | ||
1064 | C_NotifyAck *on_notify_ack; | |
1065 | { | |
9f95a23c | 1066 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
1067 | Request request(instance_id, request_id); |
1068 | auto it = m_requests.find(request); | |
11fdf7f2 | 1069 | ceph_assert(it != m_requests.end()); |
31f18b77 FG |
1070 | on_notify_ack = it->on_notify_ack; |
1071 | m_requests.erase(it); | |
1072 | } | |
1073 | ||
11fdf7f2 | 1074 | encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out); |
31f18b77 FG |
1075 | on_notify_ack->complete(0); |
1076 | } | |
1077 | ||
7c673cae FG |
1078 | template <typename I> |
1079 | void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle, | |
1080 | uint64_t notifier_id, bufferlist &bl) { | |
11fdf7f2 | 1081 | dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", " |
7c673cae FG |
1082 | << "notifier_id=" << notifier_id << dendl; |
1083 | ||
1084 | auto ctx = new C_NotifyAck(this, notify_id, handle); | |
1085 | ||
1086 | NotifyMessage notify_message; | |
1087 | try { | |
11fdf7f2 TL |
1088 | auto iter = bl.cbegin(); |
1089 | decode(notify_message, iter); | |
7c673cae FG |
1090 | } catch (const buffer::error &err) { |
1091 | derr << "error decoding image notification: " << err.what() << dendl; | |
1092 | ctx->complete(0); | |
1093 | return; | |
1094 | } | |
1095 | ||
1096 | apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx), | |
1097 | notify_message.payload); | |
1098 | } | |
1099 | ||
1100 | template <typename I> | |
1101 | void InstanceWatcher<I>::handle_image_acquire( | |
d2e6a577 | 1102 | const std::string &global_image_id, Context *on_finish) { |
11fdf7f2 | 1103 | dout(10) << "global_image_id=" << global_image_id << dendl; |
7c673cae | 1104 | |
9f95a23c | 1105 | auto ctx = new LambdaContext( |
d2e6a577 FG |
1106 | [this, global_image_id, on_finish] (int r) { |
1107 | m_instance_replayer->acquire_image(this, global_image_id, on_finish); | |
31f18b77 FG |
1108 | m_notify_op_tracker.finish_op(); |
1109 | }); | |
1110 | ||
1111 | m_notify_op_tracker.start_op(); | |
1112 | m_work_queue->queue(ctx, 0); | |
7c673cae FG |
1113 | } |
1114 | ||
1115 | template <typename I> | |
1116 | void InstanceWatcher<I>::handle_image_release( | |
d2e6a577 | 1117 | const std::string &global_image_id, Context *on_finish) { |
11fdf7f2 | 1118 | dout(10) << "global_image_id=" << global_image_id << dendl; |
7c673cae | 1119 | |
9f95a23c | 1120 | auto ctx = new LambdaContext( |
d2e6a577 FG |
1121 | [this, global_image_id, on_finish] (int r) { |
1122 | m_instance_replayer->release_image(global_image_id, on_finish); | |
1123 | m_notify_op_tracker.finish_op(); | |
1124 | }); | |
1125 | ||
1126 | m_notify_op_tracker.start_op(); | |
1127 | m_work_queue->queue(ctx, 0); | |
1128 | } | |
1129 | ||
1130 | template <typename I> | |
1131 | void InstanceWatcher<I>::handle_peer_image_removed( | |
1132 | const std::string &global_image_id, const std::string &peer_mirror_uuid, | |
1133 | Context *on_finish) { | |
11fdf7f2 | 1134 | dout(10) << "global_image_id=" << global_image_id << ", " |
d2e6a577 FG |
1135 | << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; |
1136 | ||
9f95a23c | 1137 | auto ctx = new LambdaContext( |
d2e6a577 FG |
1138 | [this, peer_mirror_uuid, global_image_id, on_finish] (int r) { |
1139 | m_instance_replayer->remove_peer_image(global_image_id, | |
1140 | peer_mirror_uuid, on_finish); | |
31f18b77 FG |
1141 | m_notify_op_tracker.finish_op(); |
1142 | }); | |
1143 | ||
1144 | m_notify_op_tracker.start_op(); | |
1145 | m_work_queue->queue(ctx, 0); | |
1146 | } | |
1147 | ||
1148 | template <typename I> | |
1149 | void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id, | |
1150 | const std::string &sync_id, | |
1151 | Context *on_finish) { | |
11fdf7f2 | 1152 | dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; |
31f18b77 | 1153 | |
9f95a23c | 1154 | std::lock_guard locker{m_lock}; |
31f18b77 | 1155 | |
9f95a23c | 1156 | if (!is_leader()) { |
11fdf7f2 | 1157 | dout(10) << "sync request for non-leader" << dendl; |
31f18b77 FG |
1158 | m_work_queue->queue(on_finish, -ESTALE); |
1159 | return; | |
1160 | } | |
1161 | ||
1162 | Context *on_start = create_async_context_callback( | |
9f95a23c | 1163 | m_work_queue, new LambdaContext( |
31f18b77 | 1164 | [this, instance_id, sync_id, on_finish] (int r) { |
11fdf7f2 | 1165 | dout(10) << "handle_sync_request: finish: instance_id=" << instance_id |
31f18b77 FG |
1166 | << ", sync_id=" << sync_id << ", r=" << r << dendl; |
1167 | if (r == 0) { | |
1168 | notify_sync_start(instance_id, sync_id); | |
1169 | } | |
494da23a TL |
1170 | if (r == -ENOENT) { |
1171 | r = 0; | |
1172 | } | |
31f18b77 FG |
1173 | on_finish->complete(r); |
1174 | })); | |
9f95a23c | 1175 | m_image_sync_throttler->start_op(m_ioctx.get_namespace(), sync_id, on_start); |
31f18b77 FG |
1176 | } |
1177 | ||
1178 | template <typename I> | |
1179 | void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id, | |
1180 | const std::string &sync_id, | |
1181 | Context *on_finish) { | |
11fdf7f2 | 1182 | dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; |
31f18b77 | 1183 | |
9f95a23c | 1184 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
1185 | |
1186 | auto it = m_inflight_sync_reqs.find(sync_id); | |
1187 | if (it == m_inflight_sync_reqs.end()) { | |
11fdf7f2 | 1188 | dout(5) << "not found" << dendl; |
31f18b77 FG |
1189 | m_work_queue->queue(on_finish, 0); |
1190 | return; | |
1191 | } | |
1192 | ||
1193 | auto sync_ctx = it->second; | |
1194 | ||
1195 | if (sync_ctx->on_complete != nullptr) { | |
11fdf7f2 | 1196 | dout(5) << "duplicate request" << dendl; |
31f18b77 FG |
1197 | m_work_queue->queue(sync_ctx->on_complete, -ESTALE); |
1198 | } | |
1199 | ||
1200 | sync_ctx->on_complete = on_finish; | |
7c673cae FG |
1201 | } |
1202 | ||
1203 | template <typename I> | |
1204 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1205 | const ImageAcquirePayload &payload, | |
1206 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1207 | dout(10) << "image_acquire: instance_id=" << instance_id << ", " |
7c673cae FG |
1208 | << "request_id=" << payload.request_id << dendl; |
1209 | ||
1210 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1211 | on_notify_ack); | |
1212 | if (on_finish != nullptr) { | |
d2e6a577 | 1213 | handle_image_acquire(payload.global_image_id, on_finish); |
7c673cae FG |
1214 | } |
1215 | } | |
1216 | ||
1217 | template <typename I> | |
1218 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1219 | const ImageReleasePayload &payload, | |
1220 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1221 | dout(10) << "image_release: instance_id=" << instance_id << ", " |
7c673cae FG |
1222 | << "request_id=" << payload.request_id << dendl; |
1223 | ||
1224 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1225 | on_notify_ack); | |
1226 | if (on_finish != nullptr) { | |
d2e6a577 FG |
1227 | handle_image_release(payload.global_image_id, on_finish); |
1228 | } | |
1229 | } | |
1230 | ||
1231 | template <typename I> | |
1232 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1233 | const PeerImageRemovedPayload &payload, | |
1234 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1235 | dout(10) << "remove_peer_image: instance_id=" << instance_id << ", " |
d2e6a577 FG |
1236 | << "request_id=" << payload.request_id << dendl; |
1237 | ||
1238 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1239 | on_notify_ack); | |
1240 | if (on_finish != nullptr) { | |
1241 | handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid, | |
1242 | on_finish); | |
7c673cae FG |
1243 | } |
1244 | } | |
1245 | ||
31f18b77 FG |
1246 | template <typename I> |
1247 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1248 | const SyncRequestPayload &payload, | |
1249 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1250 | dout(10) << "sync_request: instance_id=" << instance_id << ", " |
31f18b77 FG |
1251 | << "request_id=" << payload.request_id << dendl; |
1252 | ||
1253 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1254 | on_notify_ack); | |
1255 | if (on_finish == nullptr) { | |
1256 | return; | |
1257 | } | |
1258 | ||
1259 | handle_sync_request(instance_id, payload.sync_id, on_finish); | |
1260 | } | |
1261 | ||
1262 | template <typename I> | |
1263 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1264 | const SyncStartPayload &payload, | |
1265 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1266 | dout(10) << "sync_start: instance_id=" << instance_id << ", " |
31f18b77 FG |
1267 | << "request_id=" << payload.request_id << dendl; |
1268 | ||
1269 | auto on_finish = prepare_request(instance_id, payload.request_id, | |
1270 | on_notify_ack); | |
1271 | if (on_finish == nullptr) { | |
1272 | return; | |
1273 | } | |
1274 | ||
1275 | handle_sync_start(instance_id, payload.sync_id, on_finish); | |
1276 | } | |
1277 | ||
7c673cae FG |
1278 | template <typename I> |
1279 | void InstanceWatcher<I>::handle_payload(const std::string &instance_id, | |
1280 | const UnknownPayload &payload, | |
1281 | C_NotifyAck *on_notify_ack) { | |
11fdf7f2 | 1282 | dout(5) << "unknown: instance_id=" << instance_id << dendl; |
7c673cae FG |
1283 | |
1284 | on_notify_ack->complete(0); | |
1285 | } | |
1286 | ||
1287 | } // namespace mirror | |
1288 | } // namespace rbd | |
1289 | ||
1290 | template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>; |