]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/stringify.h" | |
5 | #include "common/Timer.h" | |
6 | #include "common/debug.h" | |
7 | #include "common/errno.h" | |
8 | #include "librbd/Utils.h" | |
9 | #include "ImageReplayer.h" | |
10 | #include "InstanceReplayer.h" | |
11 | #include "Threads.h" | |
12 | ||
13 | #define dout_context g_ceph_context | |
14 | #define dout_subsys ceph_subsys_rbd_mirror | |
15 | #undef dout_prefix | |
16 | #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ | |
17 | << this << " " << __func__ << ": " | |
18 | ||
19 | namespace rbd { | |
20 | namespace mirror { | |
21 | ||
22 | using librbd::util::create_async_context_callback; | |
23 | using librbd::util::create_context_callback; | |
24 | ||
25 | template <typename I> | |
26 | InstanceReplayer<I>::InstanceReplayer( | |
27 | Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter, | |
28 | ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados, | |
29 | const std::string &local_mirror_uuid, int64_t local_pool_id) | |
30 | : m_threads(threads), m_image_deleter(image_deleter), | |
31 | m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados), | |
32 | m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), | |
33 | m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { | |
34 | } | |
35 | ||
36 | template <typename I> | |
37 | InstanceReplayer<I>::~InstanceReplayer() { | |
38 | assert(m_image_state_check_task == nullptr); | |
39 | assert(m_async_op_tracker.empty()); | |
40 | assert(m_image_replayers.empty()); | |
41 | } | |
42 | ||
43 | template <typename I> | |
44 | int InstanceReplayer<I>::init() { | |
45 | C_SaferCond init_ctx; | |
46 | init(&init_ctx); | |
47 | return init_ctx.wait(); | |
48 | } | |
49 | ||
50 | template <typename I> | |
51 | void InstanceReplayer<I>::init(Context *on_finish) { | |
52 | dout(20) << dendl; | |
53 | ||
54 | Context *ctx = new FunctionContext( | |
55 | [this, on_finish] (int r) { | |
56 | { | |
57 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
58 | schedule_image_state_check_task(); | |
59 | } | |
60 | on_finish->complete(0); | |
61 | }); | |
62 | ||
63 | m_threads->work_queue->queue(ctx, 0); | |
64 | } | |
65 | ||
66 | template <typename I> | |
67 | void InstanceReplayer<I>::shut_down() { | |
68 | C_SaferCond shut_down_ctx; | |
69 | shut_down(&shut_down_ctx); | |
70 | int r = shut_down_ctx.wait(); | |
71 | assert(r == 0); | |
72 | } | |
73 | ||
74 | template <typename I> | |
75 | void InstanceReplayer<I>::shut_down(Context *on_finish) { | |
76 | dout(20) << dendl; | |
77 | ||
78 | Mutex::Locker locker(m_lock); | |
79 | ||
80 | assert(m_on_shut_down == nullptr); | |
81 | m_on_shut_down = on_finish; | |
82 | ||
83 | Context *ctx = new FunctionContext( | |
84 | [this] (int r) { | |
85 | cancel_image_state_check_task(); | |
86 | wait_for_ops(); | |
87 | }); | |
88 | ||
89 | m_threads->work_queue->queue(ctx, 0); | |
90 | } | |
91 | ||
92 | template <typename I> | |
93 | void InstanceReplayer<I>::add_peer(std::string mirror_uuid, | |
94 | librados::IoCtx io_ctx) { | |
95 | dout(20) << mirror_uuid << dendl; | |
96 | ||
97 | Mutex::Locker locker(m_lock); | |
98 | auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second; | |
99 | assert(result); | |
100 | } | |
101 | ||
102 | template <typename I> | |
103 | void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) { | |
104 | dout(20) << mirror_uuid << dendl; | |
105 | ||
106 | Mutex::Locker locker(m_lock); | |
107 | auto result = m_peers.erase(Peer(mirror_uuid)); | |
108 | assert(result > 0); | |
109 | } | |
110 | ||
111 | template <typename I> | |
112 | void InstanceReplayer<I>::release_all(Context *on_finish) { | |
113 | dout(20) << dendl; | |
114 | ||
115 | Mutex::Locker locker(m_lock); | |
116 | ||
117 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); | |
118 | for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); | |
119 | it = m_image_replayers.erase(it)) { | |
120 | auto image_replayer = it->second; | |
121 | auto ctx = gather_ctx->new_sub(); | |
122 | ctx = new FunctionContext( | |
123 | [image_replayer, ctx] (int r) { | |
124 | image_replayer->destroy(); | |
125 | ctx->complete(0); | |
126 | }); | |
127 | stop_image_replayer(image_replayer, ctx); | |
128 | } | |
129 | gather_ctx->activate(); | |
130 | } | |
131 | ||
132 | template <typename I> | |
133 | void InstanceReplayer<I>::acquire_image(const std::string &global_image_id, | |
134 | const std::string &peer_mirror_uuid, | |
135 | const std::string &peer_image_id, | |
136 | Context *on_finish) { | |
137 | dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid=" | |
138 | << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl; | |
139 | ||
140 | Mutex::Locker locker(m_lock); | |
141 | ||
142 | assert(m_on_shut_down == nullptr); | |
143 | ||
144 | auto it = m_image_replayers.find(global_image_id); | |
145 | ||
146 | if (it == m_image_replayers.end()) { | |
147 | auto image_replayer = ImageReplayer<I>::create( | |
148 | m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados, | |
149 | m_local_mirror_uuid, m_local_pool_id, global_image_id); | |
150 | ||
151 | dout(20) << global_image_id << ": creating replayer " << image_replayer | |
152 | << dendl; | |
153 | ||
154 | it = m_image_replayers.insert(std::make_pair(global_image_id, | |
155 | image_replayer)).first; | |
156 | } | |
157 | ||
158 | auto image_replayer = it->second; | |
159 | if (!peer_mirror_uuid.empty()) { | |
160 | auto iter = m_peers.find(Peer(peer_mirror_uuid)); | |
161 | assert(iter != m_peers.end()); | |
162 | auto io_ctx = iter->io_ctx; | |
163 | ||
164 | image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx); | |
165 | } | |
166 | start_image_replayer(image_replayer); | |
167 | ||
168 | m_threads->work_queue->queue(on_finish, 0); | |
169 | } | |
170 | ||
171 | template <typename I> | |
172 | void InstanceReplayer<I>::release_image(const std::string &global_image_id, | |
173 | const std::string &peer_mirror_uuid, | |
174 | const std::string &peer_image_id, | |
175 | bool schedule_delete, | |
176 | Context *on_finish) { | |
177 | dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid=" | |
178 | << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl; | |
179 | ||
180 | Mutex::Locker locker(m_lock); | |
181 | ||
182 | assert(m_on_shut_down == nullptr); | |
183 | ||
184 | auto it = m_image_replayers.find(global_image_id); | |
185 | ||
186 | if (it == m_image_replayers.end()) { | |
187 | dout(20) << global_image_id << ": not found" << dendl; | |
188 | m_threads->work_queue->queue(on_finish, 0); | |
189 | return; | |
190 | } | |
191 | ||
192 | auto image_replayer = it->second; | |
193 | if (!peer_mirror_uuid.empty()) { | |
194 | image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id, | |
195 | schedule_delete); | |
196 | } | |
197 | ||
198 | if (!image_replayer->remote_images_empty()) { | |
199 | dout(20) << global_image_id << ": still has peer images" << dendl; | |
200 | m_threads->work_queue->queue(on_finish, 0); | |
201 | return; | |
202 | } | |
203 | ||
204 | m_image_replayers.erase(it); | |
205 | ||
206 | on_finish = new FunctionContext( | |
207 | [image_replayer, on_finish] (int r) { | |
208 | image_replayer->destroy(); | |
209 | on_finish->complete(0); | |
210 | }); | |
211 | ||
212 | if (schedule_delete) { | |
213 | on_finish = new FunctionContext( | |
214 | [this, image_replayer, on_finish] (int r) { | |
215 | auto global_image_id = image_replayer->get_global_image_id(); | |
216 | m_image_deleter->schedule_image_delete( | |
217 | m_local_rados, m_local_pool_id, global_image_id); | |
218 | on_finish->complete(0); | |
219 | }); | |
220 | } | |
221 | ||
222 | stop_image_replayer(image_replayer, on_finish); | |
223 | } | |
224 | ||
225 | template <typename I> | |
226 | void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) { | |
227 | dout(20) << dendl; | |
228 | ||
229 | if (!f) { | |
230 | return; | |
231 | } | |
232 | ||
233 | Mutex::Locker locker(m_lock); | |
234 | ||
235 | f->open_array_section("image_replayers"); | |
236 | for (auto &kv : m_image_replayers) { | |
237 | auto &image_replayer = kv.second; | |
238 | image_replayer->print_status(f, ss); | |
239 | } | |
240 | f->close_section(); | |
241 | } | |
242 | ||
243 | template <typename I> | |
244 | void InstanceReplayer<I>::start() | |
245 | { | |
246 | dout(20) << dendl; | |
247 | ||
248 | Mutex::Locker locker(m_lock); | |
249 | ||
250 | m_manual_stop = false; | |
251 | ||
252 | for (auto &kv : m_image_replayers) { | |
253 | auto &image_replayer = kv.second; | |
254 | image_replayer->start(nullptr, true); | |
255 | } | |
256 | } | |
257 | ||
258 | template <typename I> | |
259 | void InstanceReplayer<I>::stop() | |
260 | { | |
261 | dout(20) << dendl; | |
262 | ||
263 | Mutex::Locker locker(m_lock); | |
264 | ||
265 | m_manual_stop = true; | |
266 | ||
267 | for (auto &kv : m_image_replayers) { | |
268 | auto &image_replayer = kv.second; | |
269 | image_replayer->stop(nullptr, true); | |
270 | } | |
271 | } | |
272 | ||
273 | template <typename I> | |
274 | void InstanceReplayer<I>::restart() | |
275 | { | |
276 | dout(20) << dendl; | |
277 | ||
278 | Mutex::Locker locker(m_lock); | |
279 | ||
280 | m_manual_stop = false; | |
281 | ||
282 | for (auto &kv : m_image_replayers) { | |
283 | auto &image_replayer = kv.second; | |
284 | image_replayer->restart(); | |
285 | } | |
286 | } | |
287 | ||
288 | template <typename I> | |
289 | void InstanceReplayer<I>::flush() | |
290 | { | |
291 | dout(20) << "enter" << dendl; | |
292 | ||
293 | Mutex::Locker locker(m_lock); | |
294 | ||
295 | for (auto &kv : m_image_replayers) { | |
296 | auto &image_replayer = kv.second; | |
297 | image_replayer->flush(); | |
298 | } | |
299 | } | |
300 | ||
301 | template <typename I> | |
302 | void InstanceReplayer<I>::start_image_replayer( | |
303 | ImageReplayer<I> *image_replayer) { | |
304 | assert(m_lock.is_locked()); | |
305 | ||
306 | std::string global_image_id = image_replayer->get_global_image_id(); | |
307 | dout(20) << "global_image_id=" << global_image_id << dendl; | |
308 | ||
309 | if (!image_replayer->is_stopped()) { | |
310 | return; | |
311 | } else if (image_replayer->is_blacklisted()) { | |
312 | derr << "blacklisted detected during image replay" << dendl; | |
313 | return; | |
314 | } | |
315 | ||
316 | FunctionContext *ctx = new FunctionContext( | |
317 | [this, global_image_id] (int r) { | |
318 | dout(20) << "image deleter result: r=" << r << ", " | |
319 | << "global_image_id=" << global_image_id << dendl; | |
320 | ||
321 | Mutex::Locker locker(m_lock); | |
322 | m_async_op_tracker.finish_op(); | |
323 | ||
324 | if (r == -ESTALE || r == -ECANCELED) { | |
325 | return; | |
326 | } | |
327 | ||
328 | auto it = m_image_replayers.find(global_image_id); | |
329 | if (it == m_image_replayers.end()) { | |
330 | return; | |
331 | } | |
332 | ||
333 | auto image_replayer = it->second; | |
334 | if (r >= 0) { | |
335 | image_replayer->start(nullptr, false); | |
336 | } else { | |
337 | start_image_replayer(image_replayer); | |
338 | } | |
339 | }); | |
340 | ||
341 | m_async_op_tracker.start_op(); | |
342 | m_image_deleter->wait_for_scheduled_deletion( | |
343 | m_local_pool_id, image_replayer->get_global_image_id(), ctx, false); | |
344 | } | |
345 | ||
346 | template <typename I> | |
347 | void InstanceReplayer<I>::start_image_replayers() { | |
348 | dout(20) << dendl; | |
349 | ||
350 | Context *ctx = new FunctionContext( | |
351 | [this] (int r) { | |
352 | Mutex::Locker locker(m_lock); | |
353 | m_async_op_tracker.finish_op(); | |
354 | if (m_on_shut_down != nullptr) { | |
355 | return; | |
356 | } | |
357 | for (auto &it : m_image_replayers) { | |
358 | start_image_replayer(it.second); | |
359 | } | |
360 | }); | |
361 | ||
362 | m_async_op_tracker.start_op(); | |
363 | m_threads->work_queue->queue(ctx, 0); | |
364 | } | |
365 | ||
366 | ||
367 | template <typename I> | |
368 | void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer, | |
369 | Context *on_finish) { | |
370 | dout(20) << image_replayer << " global_image_id=" | |
371 | << image_replayer->get_global_image_id() << ", on_finish=" | |
372 | << on_finish << dendl; | |
373 | ||
374 | if (image_replayer->is_stopped()) { | |
375 | m_threads->work_queue->queue(on_finish, 0); | |
376 | return; | |
377 | } | |
378 | ||
379 | m_async_op_tracker.start_op(); | |
380 | Context *ctx = create_async_context_callback( | |
381 | m_threads->work_queue, new FunctionContext( | |
382 | [this, image_replayer, on_finish] (int r) { | |
383 | stop_image_replayer(image_replayer, on_finish); | |
384 | m_async_op_tracker.finish_op(); | |
385 | })); | |
386 | ||
387 | if (image_replayer->is_running()) { | |
388 | image_replayer->stop(ctx, false); | |
389 | } else { | |
390 | int after = 1; | |
391 | dout(20) << "scheduling image replayer " << image_replayer << " stop after " | |
392 | << after << " sec (task " << ctx << ")" << dendl; | |
393 | ctx = new FunctionContext( | |
394 | [this, after, ctx] (int r) { | |
395 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
396 | m_threads->timer->add_event_after(after, ctx); | |
397 | }); | |
398 | m_threads->work_queue->queue(ctx, 0); | |
399 | } | |
400 | } | |
401 | ||
402 | template <typename I> | |
403 | void InstanceReplayer<I>::wait_for_ops() { | |
404 | dout(20) << dendl; | |
405 | ||
406 | Context *ctx = create_context_callback< | |
407 | InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this); | |
408 | ||
409 | m_async_op_tracker.wait_for_ops(ctx); | |
410 | } | |
411 | ||
412 | template <typename I> | |
413 | void InstanceReplayer<I>::handle_wait_for_ops(int r) { | |
414 | dout(20) << "r=" << r << dendl; | |
415 | ||
416 | assert(r == 0); | |
417 | ||
418 | Mutex::Locker locker(m_lock); | |
419 | stop_image_replayers(); | |
420 | } | |
421 | ||
422 | template <typename I> | |
423 | void InstanceReplayer<I>::stop_image_replayers() { | |
424 | dout(20) << dendl; | |
425 | ||
426 | assert(m_lock.is_locked()); | |
427 | ||
428 | Context *ctx = create_async_context_callback( | |
429 | m_threads->work_queue, create_context_callback<InstanceReplayer<I>, | |
430 | &InstanceReplayer<I>::handle_stop_image_replayers>(this)); | |
431 | ||
432 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); | |
433 | for (auto &it : m_image_replayers) { | |
434 | stop_image_replayer(it.second, gather_ctx->new_sub()); | |
435 | } | |
436 | gather_ctx->activate(); | |
437 | } | |
438 | ||
439 | template <typename I> | |
440 | void InstanceReplayer<I>::handle_stop_image_replayers(int r) { | |
441 | dout(20) << "r=" << r << dendl; | |
442 | ||
443 | assert(r == 0); | |
444 | ||
445 | Context *on_finish = nullptr; | |
446 | { | |
447 | Mutex::Locker locker(m_lock); | |
448 | ||
449 | for (auto &it : m_image_replayers) { | |
450 | assert(it.second->is_stopped()); | |
451 | it.second->destroy(); | |
452 | } | |
453 | m_image_replayers.clear(); | |
454 | ||
455 | assert(m_on_shut_down != nullptr); | |
456 | std::swap(on_finish, m_on_shut_down); | |
457 | } | |
458 | on_finish->complete(r); | |
459 | } | |
460 | ||
461 | template <typename I> | |
462 | void InstanceReplayer<I>::cancel_image_state_check_task() { | |
463 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
464 | ||
465 | if (m_image_state_check_task == nullptr) { | |
466 | return; | |
467 | } | |
468 | ||
469 | dout(20) << m_image_state_check_task << dendl; | |
470 | bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); | |
471 | assert(canceled); | |
472 | m_image_state_check_task = nullptr; | |
473 | } | |
474 | ||
475 | template <typename I> | |
476 | void InstanceReplayer<I>::schedule_image_state_check_task() { | |
477 | assert(m_threads->timer_lock.is_locked()); | |
478 | assert(m_image_state_check_task == nullptr); | |
479 | ||
480 | m_image_state_check_task = new FunctionContext( | |
481 | [this](int r) { | |
482 | assert(m_threads->timer_lock.is_locked()); | |
483 | m_image_state_check_task = nullptr; | |
484 | schedule_image_state_check_task(); | |
485 | start_image_replayers(); | |
486 | }); | |
487 | ||
488 | int after = | |
489 | max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval); | |
490 | ||
491 | dout(20) << "scheduling image state check after " << after << " sec (task " | |
492 | << m_image_state_check_task << ")" << dendl; | |
493 | m_threads->timer->add_event_after(after, m_image_state_check_task); | |
494 | } | |
495 | ||
496 | } // namespace mirror | |
497 | } // namespace rbd | |
498 | ||
499 | template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>; |