]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/stringify.h"
5#include "common/Timer.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "librbd/Utils.h"
9#include "ImageReplayer.h"
10#include "InstanceReplayer.h"
11#include "Threads.h"
12
13#define dout_context g_ceph_context
14#define dout_subsys ceph_subsys_rbd_mirror
15#undef dout_prefix
16#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
17 << this << " " << __func__ << ": "
18
19namespace rbd {
20namespace mirror {
21
22using librbd::util::create_async_context_callback;
23using librbd::util::create_context_callback;
24
25template <typename I>
26InstanceReplayer<I>::InstanceReplayer(
27 Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
28 ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
29 const std::string &local_mirror_uuid, int64_t local_pool_id)
30 : m_threads(threads), m_image_deleter(image_deleter),
31 m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
32 m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
33 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
34}
35
36template <typename I>
37InstanceReplayer<I>::~InstanceReplayer() {
38 assert(m_image_state_check_task == nullptr);
39 assert(m_async_op_tracker.empty());
40 assert(m_image_replayers.empty());
41}
42
43template <typename I>
44int InstanceReplayer<I>::init() {
45 C_SaferCond init_ctx;
46 init(&init_ctx);
47 return init_ctx.wait();
48}
49
50template <typename I>
51void InstanceReplayer<I>::init(Context *on_finish) {
52 dout(20) << dendl;
53
54 Context *ctx = new FunctionContext(
55 [this, on_finish] (int r) {
56 {
57 Mutex::Locker timer_locker(m_threads->timer_lock);
58 schedule_image_state_check_task();
59 }
60 on_finish->complete(0);
61 });
62
63 m_threads->work_queue->queue(ctx, 0);
64}
65
66template <typename I>
67void InstanceReplayer<I>::shut_down() {
68 C_SaferCond shut_down_ctx;
69 shut_down(&shut_down_ctx);
70 int r = shut_down_ctx.wait();
71 assert(r == 0);
72}
73
74template <typename I>
75void InstanceReplayer<I>::shut_down(Context *on_finish) {
76 dout(20) << dendl;
77
78 Mutex::Locker locker(m_lock);
79
80 assert(m_on_shut_down == nullptr);
81 m_on_shut_down = on_finish;
82
83 Context *ctx = new FunctionContext(
84 [this] (int r) {
85 cancel_image_state_check_task();
86 wait_for_ops();
87 });
88
89 m_threads->work_queue->queue(ctx, 0);
90}
91
92template <typename I>
93void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
94 librados::IoCtx io_ctx) {
95 dout(20) << mirror_uuid << dendl;
96
97 Mutex::Locker locker(m_lock);
98 auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
99 assert(result);
100}
101
102template <typename I>
103void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
104 dout(20) << mirror_uuid << dendl;
105
106 Mutex::Locker locker(m_lock);
107 auto result = m_peers.erase(Peer(mirror_uuid));
108 assert(result > 0);
109}
110
111template <typename I>
112void InstanceReplayer<I>::release_all(Context *on_finish) {
113 dout(20) << dendl;
114
115 Mutex::Locker locker(m_lock);
116
117 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
118 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
119 it = m_image_replayers.erase(it)) {
120 auto image_replayer = it->second;
121 auto ctx = gather_ctx->new_sub();
122 ctx = new FunctionContext(
123 [image_replayer, ctx] (int r) {
124 image_replayer->destroy();
125 ctx->complete(0);
126 });
127 stop_image_replayer(image_replayer, ctx);
128 }
129 gather_ctx->activate();
130}
131
132template <typename I>
133void InstanceReplayer<I>::acquire_image(const std::string &global_image_id,
134 const std::string &peer_mirror_uuid,
135 const std::string &peer_image_id,
136 Context *on_finish) {
137 dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
138 << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
139
140 Mutex::Locker locker(m_lock);
141
142 assert(m_on_shut_down == nullptr);
143
144 auto it = m_image_replayers.find(global_image_id);
145
146 if (it == m_image_replayers.end()) {
147 auto image_replayer = ImageReplayer<I>::create(
148 m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
149 m_local_mirror_uuid, m_local_pool_id, global_image_id);
150
151 dout(20) << global_image_id << ": creating replayer " << image_replayer
152 << dendl;
153
154 it = m_image_replayers.insert(std::make_pair(global_image_id,
155 image_replayer)).first;
156 }
157
158 auto image_replayer = it->second;
159 if (!peer_mirror_uuid.empty()) {
160 auto iter = m_peers.find(Peer(peer_mirror_uuid));
161 assert(iter != m_peers.end());
162 auto io_ctx = iter->io_ctx;
163
164 image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
165 }
166 start_image_replayer(image_replayer);
167
168 m_threads->work_queue->queue(on_finish, 0);
169}
170
171template <typename I>
172void InstanceReplayer<I>::release_image(const std::string &global_image_id,
173 const std::string &peer_mirror_uuid,
174 const std::string &peer_image_id,
175 bool schedule_delete,
176 Context *on_finish) {
177 dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
178 << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
179
180 Mutex::Locker locker(m_lock);
181
182 assert(m_on_shut_down == nullptr);
183
184 auto it = m_image_replayers.find(global_image_id);
185
186 if (it == m_image_replayers.end()) {
187 dout(20) << global_image_id << ": not found" << dendl;
188 m_threads->work_queue->queue(on_finish, 0);
189 return;
190 }
191
192 auto image_replayer = it->second;
193 if (!peer_mirror_uuid.empty()) {
194 image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
195 schedule_delete);
196 }
197
198 if (!image_replayer->remote_images_empty()) {
199 dout(20) << global_image_id << ": still has peer images" << dendl;
200 m_threads->work_queue->queue(on_finish, 0);
201 return;
202 }
203
204 m_image_replayers.erase(it);
205
206 on_finish = new FunctionContext(
207 [image_replayer, on_finish] (int r) {
208 image_replayer->destroy();
209 on_finish->complete(0);
210 });
211
212 if (schedule_delete) {
213 on_finish = new FunctionContext(
214 [this, image_replayer, on_finish] (int r) {
215 auto global_image_id = image_replayer->get_global_image_id();
216 m_image_deleter->schedule_image_delete(
217 m_local_rados, m_local_pool_id, global_image_id);
218 on_finish->complete(0);
219 });
220 }
221
222 stop_image_replayer(image_replayer, on_finish);
223}
224
225template <typename I>
226void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
227 dout(20) << dendl;
228
229 if (!f) {
230 return;
231 }
232
233 Mutex::Locker locker(m_lock);
234
235 f->open_array_section("image_replayers");
236 for (auto &kv : m_image_replayers) {
237 auto &image_replayer = kv.second;
238 image_replayer->print_status(f, ss);
239 }
240 f->close_section();
241}
242
243template <typename I>
244void InstanceReplayer<I>::start()
245{
246 dout(20) << dendl;
247
248 Mutex::Locker locker(m_lock);
249
250 m_manual_stop = false;
251
252 for (auto &kv : m_image_replayers) {
253 auto &image_replayer = kv.second;
254 image_replayer->start(nullptr, true);
255 }
256}
257
258template <typename I>
259void InstanceReplayer<I>::stop()
260{
261 dout(20) << dendl;
262
263 Mutex::Locker locker(m_lock);
264
265 m_manual_stop = true;
266
267 for (auto &kv : m_image_replayers) {
268 auto &image_replayer = kv.second;
269 image_replayer->stop(nullptr, true);
270 }
271}
272
273template <typename I>
274void InstanceReplayer<I>::restart()
275{
276 dout(20) << dendl;
277
278 Mutex::Locker locker(m_lock);
279
280 m_manual_stop = false;
281
282 for (auto &kv : m_image_replayers) {
283 auto &image_replayer = kv.second;
284 image_replayer->restart();
285 }
286}
287
288template <typename I>
289void InstanceReplayer<I>::flush()
290{
291 dout(20) << "enter" << dendl;
292
293 Mutex::Locker locker(m_lock);
294
295 for (auto &kv : m_image_replayers) {
296 auto &image_replayer = kv.second;
297 image_replayer->flush();
298 }
299}
300
301template <typename I>
302void InstanceReplayer<I>::start_image_replayer(
303 ImageReplayer<I> *image_replayer) {
304 assert(m_lock.is_locked());
305
306 std::string global_image_id = image_replayer->get_global_image_id();
307 dout(20) << "global_image_id=" << global_image_id << dendl;
308
309 if (!image_replayer->is_stopped()) {
310 return;
311 } else if (image_replayer->is_blacklisted()) {
312 derr << "blacklisted detected during image replay" << dendl;
313 return;
314 }
315
316 FunctionContext *ctx = new FunctionContext(
317 [this, global_image_id] (int r) {
318 dout(20) << "image deleter result: r=" << r << ", "
319 << "global_image_id=" << global_image_id << dendl;
320
321 Mutex::Locker locker(m_lock);
322 m_async_op_tracker.finish_op();
323
324 if (r == -ESTALE || r == -ECANCELED) {
325 return;
326 }
327
328 auto it = m_image_replayers.find(global_image_id);
329 if (it == m_image_replayers.end()) {
330 return;
331 }
332
333 auto image_replayer = it->second;
334 if (r >= 0) {
335 image_replayer->start(nullptr, false);
336 } else {
337 start_image_replayer(image_replayer);
338 }
339 });
340
341 m_async_op_tracker.start_op();
342 m_image_deleter->wait_for_scheduled_deletion(
343 m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
344}
345
346template <typename I>
347void InstanceReplayer<I>::start_image_replayers() {
348 dout(20) << dendl;
349
350 Context *ctx = new FunctionContext(
351 [this] (int r) {
352 Mutex::Locker locker(m_lock);
353 m_async_op_tracker.finish_op();
354 if (m_on_shut_down != nullptr) {
355 return;
356 }
357 for (auto &it : m_image_replayers) {
358 start_image_replayer(it.second);
359 }
360 });
361
362 m_async_op_tracker.start_op();
363 m_threads->work_queue->queue(ctx, 0);
364}
365
366
367template <typename I>
368void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
369 Context *on_finish) {
370 dout(20) << image_replayer << " global_image_id="
371 << image_replayer->get_global_image_id() << ", on_finish="
372 << on_finish << dendl;
373
374 if (image_replayer->is_stopped()) {
375 m_threads->work_queue->queue(on_finish, 0);
376 return;
377 }
378
379 m_async_op_tracker.start_op();
380 Context *ctx = create_async_context_callback(
381 m_threads->work_queue, new FunctionContext(
382 [this, image_replayer, on_finish] (int r) {
383 stop_image_replayer(image_replayer, on_finish);
384 m_async_op_tracker.finish_op();
385 }));
386
387 if (image_replayer->is_running()) {
388 image_replayer->stop(ctx, false);
389 } else {
390 int after = 1;
391 dout(20) << "scheduling image replayer " << image_replayer << " stop after "
392 << after << " sec (task " << ctx << ")" << dendl;
393 ctx = new FunctionContext(
394 [this, after, ctx] (int r) {
395 Mutex::Locker timer_locker(m_threads->timer_lock);
396 m_threads->timer->add_event_after(after, ctx);
397 });
398 m_threads->work_queue->queue(ctx, 0);
399 }
400}
401
402template <typename I>
403void InstanceReplayer<I>::wait_for_ops() {
404 dout(20) << dendl;
405
406 Context *ctx = create_context_callback<
407 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
408
409 m_async_op_tracker.wait_for_ops(ctx);
410}
411
412template <typename I>
413void InstanceReplayer<I>::handle_wait_for_ops(int r) {
414 dout(20) << "r=" << r << dendl;
415
416 assert(r == 0);
417
418 Mutex::Locker locker(m_lock);
419 stop_image_replayers();
420}
421
422template <typename I>
423void InstanceReplayer<I>::stop_image_replayers() {
424 dout(20) << dendl;
425
426 assert(m_lock.is_locked());
427
428 Context *ctx = create_async_context_callback(
429 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
430 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
431
432 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
433 for (auto &it : m_image_replayers) {
434 stop_image_replayer(it.second, gather_ctx->new_sub());
435 }
436 gather_ctx->activate();
437}
438
439template <typename I>
440void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
441 dout(20) << "r=" << r << dendl;
442
443 assert(r == 0);
444
445 Context *on_finish = nullptr;
446 {
447 Mutex::Locker locker(m_lock);
448
449 for (auto &it : m_image_replayers) {
450 assert(it.second->is_stopped());
451 it.second->destroy();
452 }
453 m_image_replayers.clear();
454
455 assert(m_on_shut_down != nullptr);
456 std::swap(on_finish, m_on_shut_down);
457 }
458 on_finish->complete(r);
459}
460
461template <typename I>
462void InstanceReplayer<I>::cancel_image_state_check_task() {
463 Mutex::Locker timer_locker(m_threads->timer_lock);
464
465 if (m_image_state_check_task == nullptr) {
466 return;
467 }
468
469 dout(20) << m_image_state_check_task << dendl;
470 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
471 assert(canceled);
472 m_image_state_check_task = nullptr;
473}
474
475template <typename I>
476void InstanceReplayer<I>::schedule_image_state_check_task() {
477 assert(m_threads->timer_lock.is_locked());
478 assert(m_image_state_check_task == nullptr);
479
480 m_image_state_check_task = new FunctionContext(
481 [this](int r) {
482 assert(m_threads->timer_lock.is_locked());
483 m_image_state_check_task = nullptr;
484 schedule_image_state_check_task();
485 start_image_replayers();
486 });
487
488 int after =
489 max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval);
490
491 dout(20) << "scheduling image state check after " << after << " sec (task "
492 << m_image_state_check_task << ")" << dendl;
493 m_threads->timer->add_event_after(after, m_image_state_check_task);
494}
495
496} // namespace mirror
497} // namespace rbd
498
499template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;