]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/stringify.h" | |
5 | #include "common/Timer.h" | |
6 | #include "common/debug.h" | |
7 | #include "common/errno.h" | |
8 | #include "librbd/Utils.h" | |
9 | #include "ImageReplayer.h" | |
10 | #include "InstanceReplayer.h" | |
c07f9fc5 | 11 | #include "ServiceDaemon.h" |
7c673cae FG |
12 | #include "Threads.h" |
13 | ||
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_rbd_mirror | |
16 | #undef dout_prefix | |
17 | #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ | |
18 | << this << " " << __func__ << ": " | |
19 | ||
20 | namespace rbd { | |
21 | namespace mirror { | |
22 | ||
c07f9fc5 FG |
23 | namespace { |
24 | ||
25 | const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); | |
26 | const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); | |
27 | const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); | |
28 | ||
29 | } // anonymous namespace | |
30 | ||
7c673cae FG |
31 | using librbd::util::create_async_context_callback; |
32 | using librbd::util::create_context_callback; | |
33 | ||
34 | template <typename I> | |
35 | InstanceReplayer<I>::InstanceReplayer( | |
c07f9fc5 FG |
36 | Threads<I> *threads, ServiceDaemon<I>* service_daemon, |
37 | ImageDeleter<I>* image_deleter, RadosRef local_rados, | |
38 | const std::string &local_mirror_uuid, int64_t local_pool_id) | |
39 | : m_threads(threads), m_service_daemon(service_daemon), | |
40 | m_image_deleter(image_deleter), m_local_rados(local_rados), | |
41 | m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), | |
31f18b77 | 42 | m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { |
7c673cae FG |
43 | } |
44 | ||
45 | template <typename I> | |
46 | InstanceReplayer<I>::~InstanceReplayer() { | |
47 | assert(m_image_state_check_task == nullptr); | |
48 | assert(m_async_op_tracker.empty()); | |
49 | assert(m_image_replayers.empty()); | |
50 | } | |
51 | ||
52 | template <typename I> | |
53 | int InstanceReplayer<I>::init() { | |
54 | C_SaferCond init_ctx; | |
55 | init(&init_ctx); | |
56 | return init_ctx.wait(); | |
57 | } | |
58 | ||
59 | template <typename I> | |
60 | void InstanceReplayer<I>::init(Context *on_finish) { | |
61 | dout(20) << dendl; | |
62 | ||
63 | Context *ctx = new FunctionContext( | |
64 | [this, on_finish] (int r) { | |
65 | { | |
66 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
67 | schedule_image_state_check_task(); | |
68 | } | |
69 | on_finish->complete(0); | |
70 | }); | |
71 | ||
72 | m_threads->work_queue->queue(ctx, 0); | |
73 | } | |
74 | ||
75 | template <typename I> | |
76 | void InstanceReplayer<I>::shut_down() { | |
77 | C_SaferCond shut_down_ctx; | |
78 | shut_down(&shut_down_ctx); | |
79 | int r = shut_down_ctx.wait(); | |
80 | assert(r == 0); | |
81 | } | |
82 | ||
83 | template <typename I> | |
84 | void InstanceReplayer<I>::shut_down(Context *on_finish) { | |
85 | dout(20) << dendl; | |
86 | ||
87 | Mutex::Locker locker(m_lock); | |
88 | ||
89 | assert(m_on_shut_down == nullptr); | |
90 | m_on_shut_down = on_finish; | |
91 | ||
92 | Context *ctx = new FunctionContext( | |
93 | [this] (int r) { | |
94 | cancel_image_state_check_task(); | |
95 | wait_for_ops(); | |
96 | }); | |
97 | ||
98 | m_threads->work_queue->queue(ctx, 0); | |
99 | } | |
100 | ||
101 | template <typename I> | |
102 | void InstanceReplayer<I>::add_peer(std::string mirror_uuid, | |
103 | librados::IoCtx io_ctx) { | |
104 | dout(20) << mirror_uuid << dendl; | |
105 | ||
106 | Mutex::Locker locker(m_lock); | |
107 | auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second; | |
108 | assert(result); | |
109 | } | |
110 | ||
111 | template <typename I> | |
112 | void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) { | |
113 | dout(20) << mirror_uuid << dendl; | |
114 | ||
115 | Mutex::Locker locker(m_lock); | |
116 | auto result = m_peers.erase(Peer(mirror_uuid)); | |
117 | assert(result > 0); | |
118 | } | |
119 | ||
120 | template <typename I> | |
121 | void InstanceReplayer<I>::release_all(Context *on_finish) { | |
122 | dout(20) << dendl; | |
123 | ||
124 | Mutex::Locker locker(m_lock); | |
125 | ||
126 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); | |
127 | for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); | |
128 | it = m_image_replayers.erase(it)) { | |
129 | auto image_replayer = it->second; | |
130 | auto ctx = gather_ctx->new_sub(); | |
131 | ctx = new FunctionContext( | |
132 | [image_replayer, ctx] (int r) { | |
133 | image_replayer->destroy(); | |
134 | ctx->complete(0); | |
135 | }); | |
136 | stop_image_replayer(image_replayer, ctx); | |
137 | } | |
138 | gather_ctx->activate(); | |
139 | } | |
140 | ||
141 | template <typename I> | |
31f18b77 FG |
142 | void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher, |
143 | const std::string &global_image_id, | |
7c673cae FG |
144 | const std::string &peer_mirror_uuid, |
145 | const std::string &peer_image_id, | |
146 | Context *on_finish) { | |
147 | dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid=" | |
148 | << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl; | |
149 | ||
150 | Mutex::Locker locker(m_lock); | |
151 | ||
152 | assert(m_on_shut_down == nullptr); | |
153 | ||
154 | auto it = m_image_replayers.find(global_image_id); | |
155 | ||
156 | if (it == m_image_replayers.end()) { | |
157 | auto image_replayer = ImageReplayer<I>::create( | |
31f18b77 FG |
158 | m_threads, m_image_deleter, instance_watcher, m_local_rados, |
159 | m_local_mirror_uuid, m_local_pool_id, global_image_id); | |
7c673cae FG |
160 | |
161 | dout(20) << global_image_id << ": creating replayer " << image_replayer | |
162 | << dendl; | |
163 | ||
164 | it = m_image_replayers.insert(std::make_pair(global_image_id, | |
165 | image_replayer)).first; | |
166 | } | |
167 | ||
168 | auto image_replayer = it->second; | |
169 | if (!peer_mirror_uuid.empty()) { | |
170 | auto iter = m_peers.find(Peer(peer_mirror_uuid)); | |
171 | assert(iter != m_peers.end()); | |
172 | auto io_ctx = iter->io_ctx; | |
173 | ||
174 | image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx); | |
175 | } | |
176 | start_image_replayer(image_replayer); | |
177 | ||
178 | m_threads->work_queue->queue(on_finish, 0); | |
179 | } | |
180 | ||
181 | template <typename I> | |
182 | void InstanceReplayer<I>::release_image(const std::string &global_image_id, | |
183 | const std::string &peer_mirror_uuid, | |
184 | const std::string &peer_image_id, | |
185 | bool schedule_delete, | |
186 | Context *on_finish) { | |
187 | dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid=" | |
188 | << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl; | |
189 | ||
190 | Mutex::Locker locker(m_lock); | |
191 | ||
192 | assert(m_on_shut_down == nullptr); | |
193 | ||
194 | auto it = m_image_replayers.find(global_image_id); | |
195 | ||
196 | if (it == m_image_replayers.end()) { | |
197 | dout(20) << global_image_id << ": not found" << dendl; | |
198 | m_threads->work_queue->queue(on_finish, 0); | |
199 | return; | |
200 | } | |
201 | ||
202 | auto image_replayer = it->second; | |
203 | if (!peer_mirror_uuid.empty()) { | |
204 | image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id, | |
205 | schedule_delete); | |
206 | } | |
207 | ||
208 | if (!image_replayer->remote_images_empty()) { | |
209 | dout(20) << global_image_id << ": still has peer images" << dendl; | |
210 | m_threads->work_queue->queue(on_finish, 0); | |
211 | return; | |
212 | } | |
213 | ||
214 | m_image_replayers.erase(it); | |
215 | ||
216 | on_finish = new FunctionContext( | |
217 | [image_replayer, on_finish] (int r) { | |
218 | image_replayer->destroy(); | |
219 | on_finish->complete(0); | |
220 | }); | |
221 | ||
222 | if (schedule_delete) { | |
223 | on_finish = new FunctionContext( | |
224 | [this, image_replayer, on_finish] (int r) { | |
225 | auto global_image_id = image_replayer->get_global_image_id(); | |
226 | m_image_deleter->schedule_image_delete( | |
c07f9fc5 | 227 | m_local_rados, m_local_pool_id, global_image_id, false); |
7c673cae FG |
228 | on_finish->complete(0); |
229 | }); | |
230 | } | |
231 | ||
232 | stop_image_replayer(image_replayer, on_finish); | |
233 | } | |
234 | ||
235 | template <typename I> | |
236 | void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) { | |
237 | dout(20) << dendl; | |
238 | ||
239 | if (!f) { | |
240 | return; | |
241 | } | |
242 | ||
243 | Mutex::Locker locker(m_lock); | |
244 | ||
245 | f->open_array_section("image_replayers"); | |
246 | for (auto &kv : m_image_replayers) { | |
247 | auto &image_replayer = kv.second; | |
248 | image_replayer->print_status(f, ss); | |
249 | } | |
250 | f->close_section(); | |
251 | } | |
252 | ||
253 | template <typename I> | |
254 | void InstanceReplayer<I>::start() | |
255 | { | |
256 | dout(20) << dendl; | |
257 | ||
258 | Mutex::Locker locker(m_lock); | |
259 | ||
260 | m_manual_stop = false; | |
261 | ||
262 | for (auto &kv : m_image_replayers) { | |
263 | auto &image_replayer = kv.second; | |
264 | image_replayer->start(nullptr, true); | |
265 | } | |
266 | } | |
267 | ||
268 | template <typename I> | |
269 | void InstanceReplayer<I>::stop() | |
270 | { | |
271 | dout(20) << dendl; | |
272 | ||
273 | Mutex::Locker locker(m_lock); | |
274 | ||
275 | m_manual_stop = true; | |
276 | ||
277 | for (auto &kv : m_image_replayers) { | |
278 | auto &image_replayer = kv.second; | |
279 | image_replayer->stop(nullptr, true); | |
280 | } | |
281 | } | |
282 | ||
283 | template <typename I> | |
284 | void InstanceReplayer<I>::restart() | |
285 | { | |
286 | dout(20) << dendl; | |
287 | ||
288 | Mutex::Locker locker(m_lock); | |
289 | ||
290 | m_manual_stop = false; | |
291 | ||
292 | for (auto &kv : m_image_replayers) { | |
293 | auto &image_replayer = kv.second; | |
294 | image_replayer->restart(); | |
295 | } | |
296 | } | |
297 | ||
298 | template <typename I> | |
299 | void InstanceReplayer<I>::flush() | |
300 | { | |
301 | dout(20) << "enter" << dendl; | |
302 | ||
303 | Mutex::Locker locker(m_lock); | |
304 | ||
305 | for (auto &kv : m_image_replayers) { | |
306 | auto &image_replayer = kv.second; | |
307 | image_replayer->flush(); | |
308 | } | |
309 | } | |
310 | ||
311 | template <typename I> | |
312 | void InstanceReplayer<I>::start_image_replayer( | |
313 | ImageReplayer<I> *image_replayer) { | |
314 | assert(m_lock.is_locked()); | |
315 | ||
316 | std::string global_image_id = image_replayer->get_global_image_id(); | |
317 | dout(20) << "global_image_id=" << global_image_id << dendl; | |
318 | ||
319 | if (!image_replayer->is_stopped()) { | |
320 | return; | |
321 | } else if (image_replayer->is_blacklisted()) { | |
322 | derr << "blacklisted detected during image replay" << dendl; | |
323 | return; | |
324 | } | |
325 | ||
326 | FunctionContext *ctx = new FunctionContext( | |
327 | [this, global_image_id] (int r) { | |
328 | dout(20) << "image deleter result: r=" << r << ", " | |
329 | << "global_image_id=" << global_image_id << dendl; | |
330 | ||
331 | Mutex::Locker locker(m_lock); | |
332 | m_async_op_tracker.finish_op(); | |
333 | ||
334 | if (r == -ESTALE || r == -ECANCELED) { | |
335 | return; | |
336 | } | |
337 | ||
338 | auto it = m_image_replayers.find(global_image_id); | |
339 | if (it == m_image_replayers.end()) { | |
340 | return; | |
341 | } | |
342 | ||
343 | auto image_replayer = it->second; | |
344 | if (r >= 0) { | |
345 | image_replayer->start(nullptr, false); | |
346 | } else { | |
347 | start_image_replayer(image_replayer); | |
348 | } | |
349 | }); | |
350 | ||
351 | m_async_op_tracker.start_op(); | |
352 | m_image_deleter->wait_for_scheduled_deletion( | |
353 | m_local_pool_id, image_replayer->get_global_image_id(), ctx, false); | |
354 | } | |
355 | ||
356 | template <typename I> | |
c07f9fc5 | 357 | void InstanceReplayer<I>::queue_start_image_replayers() { |
7c673cae FG |
358 | dout(20) << dendl; |
359 | ||
c07f9fc5 FG |
360 | Context *ctx = create_context_callback< |
361 | InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this); | |
7c673cae FG |
362 | m_async_op_tracker.start_op(); |
363 | m_threads->work_queue->queue(ctx, 0); | |
364 | } | |
365 | ||
c07f9fc5 FG |
366 | template <typename I> |
367 | void InstanceReplayer<I>::start_image_replayers(int r) { | |
368 | dout(20) << dendl; | |
369 | ||
370 | Mutex::Locker locker(m_lock); | |
371 | if (m_on_shut_down != nullptr) { | |
372 | return; | |
373 | } | |
374 | ||
375 | size_t image_count = 0; | |
376 | size_t warning_count = 0; | |
377 | size_t error_count = 0; | |
378 | for (auto &it : m_image_replayers) { | |
379 | ++image_count; | |
380 | auto health_state = it.second->get_health_state(); | |
381 | if (health_state == image_replayer::HEALTH_STATE_WARNING) { | |
382 | ++warning_count; | |
383 | } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { | |
384 | ++error_count; | |
385 | } | |
386 | ||
387 | start_image_replayer(it.second); | |
388 | } | |
389 | ||
390 | m_service_daemon->add_or_update_attribute( | |
391 | m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); | |
392 | m_service_daemon->add_or_update_attribute( | |
393 | m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); | |
394 | m_service_daemon->add_or_update_attribute( | |
395 | m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); | |
396 | ||
397 | m_async_op_tracker.finish_op(); | |
398 | } | |
7c673cae FG |
399 | |
400 | template <typename I> | |
401 | void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer, | |
402 | Context *on_finish) { | |
403 | dout(20) << image_replayer << " global_image_id=" | |
404 | << image_replayer->get_global_image_id() << ", on_finish=" | |
405 | << on_finish << dendl; | |
406 | ||
407 | if (image_replayer->is_stopped()) { | |
408 | m_threads->work_queue->queue(on_finish, 0); | |
409 | return; | |
410 | } | |
411 | ||
412 | m_async_op_tracker.start_op(); | |
413 | Context *ctx = create_async_context_callback( | |
414 | m_threads->work_queue, new FunctionContext( | |
415 | [this, image_replayer, on_finish] (int r) { | |
416 | stop_image_replayer(image_replayer, on_finish); | |
417 | m_async_op_tracker.finish_op(); | |
418 | })); | |
419 | ||
420 | if (image_replayer->is_running()) { | |
421 | image_replayer->stop(ctx, false); | |
422 | } else { | |
423 | int after = 1; | |
424 | dout(20) << "scheduling image replayer " << image_replayer << " stop after " | |
425 | << after << " sec (task " << ctx << ")" << dendl; | |
426 | ctx = new FunctionContext( | |
427 | [this, after, ctx] (int r) { | |
428 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
429 | m_threads->timer->add_event_after(after, ctx); | |
430 | }); | |
431 | m_threads->work_queue->queue(ctx, 0); | |
432 | } | |
433 | } | |
434 | ||
435 | template <typename I> | |
436 | void InstanceReplayer<I>::wait_for_ops() { | |
437 | dout(20) << dendl; | |
438 | ||
439 | Context *ctx = create_context_callback< | |
440 | InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this); | |
441 | ||
442 | m_async_op_tracker.wait_for_ops(ctx); | |
443 | } | |
444 | ||
445 | template <typename I> | |
446 | void InstanceReplayer<I>::handle_wait_for_ops(int r) { | |
447 | dout(20) << "r=" << r << dendl; | |
448 | ||
449 | assert(r == 0); | |
450 | ||
451 | Mutex::Locker locker(m_lock); | |
452 | stop_image_replayers(); | |
453 | } | |
454 | ||
455 | template <typename I> | |
456 | void InstanceReplayer<I>::stop_image_replayers() { | |
457 | dout(20) << dendl; | |
458 | ||
459 | assert(m_lock.is_locked()); | |
460 | ||
461 | Context *ctx = create_async_context_callback( | |
462 | m_threads->work_queue, create_context_callback<InstanceReplayer<I>, | |
463 | &InstanceReplayer<I>::handle_stop_image_replayers>(this)); | |
464 | ||
465 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); | |
466 | for (auto &it : m_image_replayers) { | |
467 | stop_image_replayer(it.second, gather_ctx->new_sub()); | |
468 | } | |
469 | gather_ctx->activate(); | |
470 | } | |
471 | ||
472 | template <typename I> | |
473 | void InstanceReplayer<I>::handle_stop_image_replayers(int r) { | |
474 | dout(20) << "r=" << r << dendl; | |
475 | ||
476 | assert(r == 0); | |
477 | ||
478 | Context *on_finish = nullptr; | |
479 | { | |
480 | Mutex::Locker locker(m_lock); | |
481 | ||
482 | for (auto &it : m_image_replayers) { | |
483 | assert(it.second->is_stopped()); | |
484 | it.second->destroy(); | |
485 | } | |
486 | m_image_replayers.clear(); | |
487 | ||
488 | assert(m_on_shut_down != nullptr); | |
489 | std::swap(on_finish, m_on_shut_down); | |
490 | } | |
491 | on_finish->complete(r); | |
492 | } | |
493 | ||
494 | template <typename I> | |
495 | void InstanceReplayer<I>::cancel_image_state_check_task() { | |
496 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
497 | ||
498 | if (m_image_state_check_task == nullptr) { | |
499 | return; | |
500 | } | |
501 | ||
502 | dout(20) << m_image_state_check_task << dendl; | |
503 | bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); | |
504 | assert(canceled); | |
505 | m_image_state_check_task = nullptr; | |
506 | } | |
507 | ||
508 | template <typename I> | |
509 | void InstanceReplayer<I>::schedule_image_state_check_task() { | |
510 | assert(m_threads->timer_lock.is_locked()); | |
511 | assert(m_image_state_check_task == nullptr); | |
512 | ||
513 | m_image_state_check_task = new FunctionContext( | |
514 | [this](int r) { | |
515 | assert(m_threads->timer_lock.is_locked()); | |
516 | m_image_state_check_task = nullptr; | |
517 | schedule_image_state_check_task(); | |
c07f9fc5 | 518 | queue_start_image_replayers(); |
7c673cae FG |
519 | }); |
520 | ||
c07f9fc5 | 521 | int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval; |
7c673cae FG |
522 | |
523 | dout(20) << "scheduling image state check after " << after << " sec (task " | |
524 | << m_image_state_check_task << ")" << dendl; | |
525 | m_threads->timer->add_event_after(after, m_image_state_check_task); | |
526 | } | |
527 | ||
528 | } // namespace mirror | |
529 | } // namespace rbd | |
530 | ||
531 | template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>; |