]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "ImageReplayer.h"
10 #include "InstanceReplayer.h"
11 #include "ServiceDaemon.h"
12 #include "Threads.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18 << this << " " << __func__ << ": "
19
20 namespace rbd {
21 namespace mirror {
22
23 namespace {
24
25 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
28
29 } // anonymous namespace
30
31 using librbd::util::create_async_context_callback;
32 using librbd::util::create_context_callback;
33
34 template <typename I>
35 InstanceReplayer<I>::InstanceReplayer(
36 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
37 ImageDeleter<I>* image_deleter, RadosRef local_rados,
38 const std::string &local_mirror_uuid, int64_t local_pool_id)
39 : m_threads(threads), m_service_daemon(service_daemon),
40 m_image_deleter(image_deleter), m_local_rados(local_rados),
41 m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
42 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
43 }
44
45 template <typename I>
46 InstanceReplayer<I>::~InstanceReplayer() {
47 assert(m_image_state_check_task == nullptr);
48 assert(m_async_op_tracker.empty());
49 assert(m_image_replayers.empty());
50 }
51
52 template <typename I>
53 int InstanceReplayer<I>::init() {
54 C_SaferCond init_ctx;
55 init(&init_ctx);
56 return init_ctx.wait();
57 }
58
59 template <typename I>
60 void InstanceReplayer<I>::init(Context *on_finish) {
61 dout(20) << dendl;
62
63 Context *ctx = new FunctionContext(
64 [this, on_finish] (int r) {
65 {
66 Mutex::Locker timer_locker(m_threads->timer_lock);
67 schedule_image_state_check_task();
68 }
69 on_finish->complete(0);
70 });
71
72 m_threads->work_queue->queue(ctx, 0);
73 }
74
75 template <typename I>
76 void InstanceReplayer<I>::shut_down() {
77 C_SaferCond shut_down_ctx;
78 shut_down(&shut_down_ctx);
79 int r = shut_down_ctx.wait();
80 assert(r == 0);
81 }
82
83 template <typename I>
84 void InstanceReplayer<I>::shut_down(Context *on_finish) {
85 dout(20) << dendl;
86
87 Mutex::Locker locker(m_lock);
88
89 assert(m_on_shut_down == nullptr);
90 m_on_shut_down = on_finish;
91
92 Context *ctx = new FunctionContext(
93 [this] (int r) {
94 cancel_image_state_check_task();
95 wait_for_ops();
96 });
97
98 m_threads->work_queue->queue(ctx, 0);
99 }
100
101 template <typename I>
102 void InstanceReplayer<I>::add_peer(std::string peer_uuid,
103 librados::IoCtx io_ctx) {
104 dout(20) << peer_uuid << dendl;
105
106 Mutex::Locker locker(m_lock);
107 auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
108 assert(result);
109 }
110
111 template <typename I>
112 void InstanceReplayer<I>::release_all(Context *on_finish) {
113 dout(20) << dendl;
114
115 Mutex::Locker locker(m_lock);
116
117 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
118 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
119 it = m_image_replayers.erase(it)) {
120 auto image_replayer = it->second;
121 auto ctx = gather_ctx->new_sub();
122 ctx = new FunctionContext(
123 [image_replayer, ctx] (int r) {
124 image_replayer->destroy();
125 ctx->complete(0);
126 });
127 stop_image_replayer(image_replayer, ctx);
128 }
129 gather_ctx->activate();
130 }
131
132 template <typename I>
133 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
134 const std::string &global_image_id,
135 Context *on_finish) {
136 dout(20) << "global_image_id=" << global_image_id << dendl;
137
138 Mutex::Locker locker(m_lock);
139
140 assert(m_on_shut_down == nullptr);
141
142 auto it = m_image_replayers.find(global_image_id);
143 if (it == m_image_replayers.end()) {
144 auto image_replayer = ImageReplayer<I>::create(
145 m_threads, m_image_deleter, instance_watcher, m_local_rados,
146 m_local_mirror_uuid, m_local_pool_id, global_image_id);
147
148 dout(20) << global_image_id << ": creating replayer " << image_replayer
149 << dendl;
150
151 it = m_image_replayers.insert(std::make_pair(global_image_id,
152 image_replayer)).first;
153
154 // TODO only a single peer is currently supported
155 assert(m_peers.size() == 1);
156 auto peer = *m_peers.begin();
157 image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
158 }
159
160 auto& image_replayer = it->second;
161 // TODO temporary until policy integrated
162 image_replayer->set_finished(false);
163
164 start_image_replayer(image_replayer);
165 m_threads->work_queue->queue(on_finish, 0);
166 }
167
168 template <typename I>
169 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
170 Context *on_finish) {
171 dout(20) << "global_image_id=" << global_image_id << dendl;
172
173 Mutex::Locker locker(m_lock);
174 assert(m_on_shut_down == nullptr);
175
176 auto it = m_image_replayers.find(global_image_id);
177 if (it == m_image_replayers.end()) {
178 dout(20) << global_image_id << ": not found" << dendl;
179 m_threads->work_queue->queue(on_finish, 0);
180 return;
181 }
182
183 auto image_replayer = it->second;
184 m_image_replayers.erase(it);
185
186 on_finish = new FunctionContext(
187 [image_replayer, on_finish] (int r) {
188 image_replayer->destroy();
189 on_finish->complete(0);
190 });
191 stop_image_replayer(image_replayer, on_finish);
192 }
193
194 template <typename I>
195 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
196 const std::string &peer_mirror_uuid,
197 Context *on_finish) {
198 dout(20) << "global_image_id=" << global_image_id << ", "
199 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
200
201 Mutex::Locker locker(m_lock);
202 assert(m_on_shut_down == nullptr);
203
204 auto it = m_image_replayers.find(global_image_id);
205 if (it != m_image_replayers.end()) {
206 // TODO only a single peer is currently supported, therefore
207 // we can just interrupt the current image replayer and
208 // it will eventually detect that the peer image is missing and
209 // determine if a delete propagation is required.
210 auto image_replayer = it->second;
211 image_replayer->restart();
212 }
213 m_threads->work_queue->queue(on_finish, 0);
214 }
215
216 template <typename I>
217 void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
218 dout(20) << dendl;
219
220 if (!f) {
221 return;
222 }
223
224 Mutex::Locker locker(m_lock);
225
226 f->open_array_section("image_replayers");
227 for (auto &kv : m_image_replayers) {
228 auto &image_replayer = kv.second;
229 image_replayer->print_status(f, ss);
230 }
231 f->close_section();
232 }
233
234 template <typename I>
235 void InstanceReplayer<I>::start()
236 {
237 dout(20) << dendl;
238
239 Mutex::Locker locker(m_lock);
240
241 m_manual_stop = false;
242
243 for (auto &kv : m_image_replayers) {
244 auto &image_replayer = kv.second;
245 image_replayer->start(nullptr, true);
246 }
247 }
248
249 template <typename I>
250 void InstanceReplayer<I>::stop()
251 {
252 dout(20) << dendl;
253
254 Mutex::Locker locker(m_lock);
255
256 m_manual_stop = true;
257
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
260 image_replayer->stop(nullptr, true);
261 }
262 }
263
264 template <typename I>
265 void InstanceReplayer<I>::restart()
266 {
267 dout(20) << dendl;
268
269 Mutex::Locker locker(m_lock);
270
271 m_manual_stop = false;
272
273 for (auto &kv : m_image_replayers) {
274 auto &image_replayer = kv.second;
275 image_replayer->restart();
276 }
277 }
278
279 template <typename I>
280 void InstanceReplayer<I>::flush()
281 {
282 dout(20) << "enter" << dendl;
283
284 Mutex::Locker locker(m_lock);
285
286 for (auto &kv : m_image_replayers) {
287 auto &image_replayer = kv.second;
288 image_replayer->flush();
289 }
290 }
291
292 template <typename I>
293 void InstanceReplayer<I>::start_image_replayer(
294 ImageReplayer<I> *image_replayer) {
295 assert(m_lock.is_locked());
296
297 std::string global_image_id = image_replayer->get_global_image_id();
298 dout(20) << "global_image_id=" << global_image_id << dendl;
299
300 if (!image_replayer->is_stopped()) {
301 return;
302 } else if (image_replayer->is_blacklisted()) {
303 derr << "blacklisted detected during image replay" << dendl;
304 return;
305 } else if (image_replayer->is_finished()) {
306 // TODO temporary until policy integrated
307 dout(5) << "removing image replayer for global_image_id="
308 << global_image_id << dendl;
309 m_image_replayers.erase(image_replayer->get_global_image_id());
310 image_replayer->destroy();
311 return;
312 }
313
314 image_replayer->start(nullptr, false);
315 }
316
317 template <typename I>
318 void InstanceReplayer<I>::queue_start_image_replayers() {
319 dout(20) << dendl;
320
321 Context *ctx = create_context_callback<
322 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
323 m_async_op_tracker.start_op();
324 m_threads->work_queue->queue(ctx, 0);
325 }
326
327 template <typename I>
328 void InstanceReplayer<I>::start_image_replayers(int r) {
329 dout(20) << dendl;
330
331 Mutex::Locker locker(m_lock);
332 if (m_on_shut_down != nullptr) {
333 return;
334 }
335
336 uint64_t image_count = 0;
337 uint64_t warning_count = 0;
338 uint64_t error_count = 0;
339 for (auto it = m_image_replayers.begin();
340 it != m_image_replayers.end();) {
341 auto current_it(it);
342 ++it;
343
344 ++image_count;
345 auto health_state = current_it->second->get_health_state();
346 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
347 ++warning_count;
348 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
349 ++error_count;
350 }
351
352 start_image_replayer(current_it->second);
353 }
354
355 m_service_daemon->add_or_update_attribute(
356 m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
357 m_service_daemon->add_or_update_attribute(
358 m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
359 m_service_daemon->add_or_update_attribute(
360 m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
361
362 m_async_op_tracker.finish_op();
363 }
364
365 template <typename I>
366 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
367 Context *on_finish) {
368 dout(20) << image_replayer << " global_image_id="
369 << image_replayer->get_global_image_id() << ", on_finish="
370 << on_finish << dendl;
371
372 if (image_replayer->is_stopped()) {
373 m_threads->work_queue->queue(on_finish, 0);
374 return;
375 }
376
377 m_async_op_tracker.start_op();
378 Context *ctx = create_async_context_callback(
379 m_threads->work_queue, new FunctionContext(
380 [this, image_replayer, on_finish] (int r) {
381 stop_image_replayer(image_replayer, on_finish);
382 m_async_op_tracker.finish_op();
383 }));
384
385 if (image_replayer->is_running()) {
386 image_replayer->stop(ctx, false);
387 } else {
388 int after = 1;
389 dout(20) << "scheduling image replayer " << image_replayer << " stop after "
390 << after << " sec (task " << ctx << ")" << dendl;
391 ctx = new FunctionContext(
392 [this, after, ctx] (int r) {
393 Mutex::Locker timer_locker(m_threads->timer_lock);
394 m_threads->timer->add_event_after(after, ctx);
395 });
396 m_threads->work_queue->queue(ctx, 0);
397 }
398 }
399
400 template <typename I>
401 void InstanceReplayer<I>::wait_for_ops() {
402 dout(20) << dendl;
403
404 Context *ctx = create_context_callback<
405 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
406
407 m_async_op_tracker.wait_for_ops(ctx);
408 }
409
410 template <typename I>
411 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
412 dout(20) << "r=" << r << dendl;
413
414 assert(r == 0);
415
416 Mutex::Locker locker(m_lock);
417 stop_image_replayers();
418 }
419
420 template <typename I>
421 void InstanceReplayer<I>::stop_image_replayers() {
422 dout(20) << dendl;
423
424 assert(m_lock.is_locked());
425
426 Context *ctx = create_async_context_callback(
427 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
428 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
429
430 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
431 for (auto &it : m_image_replayers) {
432 stop_image_replayer(it.second, gather_ctx->new_sub());
433 }
434 gather_ctx->activate();
435 }
436
437 template <typename I>
438 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
439 dout(20) << "r=" << r << dendl;
440
441 assert(r == 0);
442
443 Context *on_finish = nullptr;
444 {
445 Mutex::Locker locker(m_lock);
446
447 for (auto &it : m_image_replayers) {
448 assert(it.second->is_stopped());
449 it.second->destroy();
450 }
451 m_image_replayers.clear();
452
453 assert(m_on_shut_down != nullptr);
454 std::swap(on_finish, m_on_shut_down);
455 }
456 on_finish->complete(r);
457 }
458
459 template <typename I>
460 void InstanceReplayer<I>::cancel_image_state_check_task() {
461 Mutex::Locker timer_locker(m_threads->timer_lock);
462
463 if (m_image_state_check_task == nullptr) {
464 return;
465 }
466
467 dout(20) << m_image_state_check_task << dendl;
468 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
469 assert(canceled);
470 m_image_state_check_task = nullptr;
471 }
472
473 template <typename I>
474 void InstanceReplayer<I>::schedule_image_state_check_task() {
475 assert(m_threads->timer_lock.is_locked());
476 assert(m_image_state_check_task == nullptr);
477
478 m_image_state_check_task = new FunctionContext(
479 [this](int r) {
480 assert(m_threads->timer_lock.is_locked());
481 m_image_state_check_task = nullptr;
482 schedule_image_state_check_task();
483 queue_start_image_replayers();
484 });
485
486 int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval;
487
488 dout(20) << "scheduling image state check after " << after << " sec (task "
489 << m_image_state_check_task << ")" << dendl;
490 m_threads->timer->add_event_after(after, m_image_state_check_task);
491 }
492
493 } // namespace mirror
494 } // namespace rbd
495
496 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;