]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/stringify.h" | |
5 | #include "common/Timer.h" | |
6 | #include "common/debug.h" | |
7 | #include "common/errno.h" | |
8 | #include "librbd/Utils.h" | |
9 | #include "ImageReplayer.h" | |
10 | #include "InstanceReplayer.h" | |
c07f9fc5 | 11 | #include "ServiceDaemon.h" |
7c673cae FG |
12 | #include "Threads.h" |
13 | ||
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_rbd_mirror | |
16 | #undef dout_prefix | |
17 | #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ | |
18 | << this << " " << __func__ << ": " | |
19 | ||
20 | namespace rbd { | |
21 | namespace mirror { | |
22 | ||
c07f9fc5 FG |
23 | namespace { |
24 | ||
25 | const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); | |
26 | const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); | |
27 | const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); | |
28 | ||
29 | } // anonymous namespace | |
30 | ||
7c673cae FG |
31 | using librbd::util::create_async_context_callback; |
32 | using librbd::util::create_context_callback; | |
33 | ||
34 | template <typename I> | |
35 | InstanceReplayer<I>::InstanceReplayer( | |
c07f9fc5 FG |
36 | Threads<I> *threads, ServiceDaemon<I>* service_daemon, |
37 | ImageDeleter<I>* image_deleter, RadosRef local_rados, | |
38 | const std::string &local_mirror_uuid, int64_t local_pool_id) | |
39 | : m_threads(threads), m_service_daemon(service_daemon), | |
40 | m_image_deleter(image_deleter), m_local_rados(local_rados), | |
41 | m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), | |
31f18b77 | 42 | m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { |
7c673cae FG |
43 | } |
44 | ||
45 | template <typename I> | |
46 | InstanceReplayer<I>::~InstanceReplayer() { | |
47 | assert(m_image_state_check_task == nullptr); | |
48 | assert(m_async_op_tracker.empty()); | |
49 | assert(m_image_replayers.empty()); | |
50 | } | |
51 | ||
52 | template <typename I> | |
53 | int InstanceReplayer<I>::init() { | |
54 | C_SaferCond init_ctx; | |
55 | init(&init_ctx); | |
56 | return init_ctx.wait(); | |
57 | } | |
58 | ||
59 | template <typename I> | |
60 | void InstanceReplayer<I>::init(Context *on_finish) { | |
61 | dout(20) << dendl; | |
62 | ||
63 | Context *ctx = new FunctionContext( | |
64 | [this, on_finish] (int r) { | |
65 | { | |
66 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
67 | schedule_image_state_check_task(); | |
68 | } | |
69 | on_finish->complete(0); | |
70 | }); | |
71 | ||
72 | m_threads->work_queue->queue(ctx, 0); | |
73 | } | |
74 | ||
75 | template <typename I> | |
76 | void InstanceReplayer<I>::shut_down() { | |
77 | C_SaferCond shut_down_ctx; | |
78 | shut_down(&shut_down_ctx); | |
79 | int r = shut_down_ctx.wait(); | |
80 | assert(r == 0); | |
81 | } | |
82 | ||
83 | template <typename I> | |
84 | void InstanceReplayer<I>::shut_down(Context *on_finish) { | |
85 | dout(20) << dendl; | |
86 | ||
87 | Mutex::Locker locker(m_lock); | |
88 | ||
89 | assert(m_on_shut_down == nullptr); | |
90 | m_on_shut_down = on_finish; | |
91 | ||
92 | Context *ctx = new FunctionContext( | |
93 | [this] (int r) { | |
94 | cancel_image_state_check_task(); | |
95 | wait_for_ops(); | |
96 | }); | |
97 | ||
98 | m_threads->work_queue->queue(ctx, 0); | |
99 | } | |
100 | ||
101 | template <typename I> | |
d2e6a577 | 102 | void InstanceReplayer<I>::add_peer(std::string peer_uuid, |
7c673cae | 103 | librados::IoCtx io_ctx) { |
d2e6a577 | 104 | dout(20) << peer_uuid << dendl; |
7c673cae FG |
105 | |
106 | Mutex::Locker locker(m_lock); | |
d2e6a577 | 107 | auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second; |
7c673cae FG |
108 | assert(result); |
109 | } | |
110 | ||
7c673cae FG |
111 | template <typename I> |
112 | void InstanceReplayer<I>::release_all(Context *on_finish) { | |
113 | dout(20) << dendl; | |
114 | ||
115 | Mutex::Locker locker(m_lock); | |
116 | ||
117 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); | |
118 | for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); | |
119 | it = m_image_replayers.erase(it)) { | |
120 | auto image_replayer = it->second; | |
121 | auto ctx = gather_ctx->new_sub(); | |
122 | ctx = new FunctionContext( | |
123 | [image_replayer, ctx] (int r) { | |
124 | image_replayer->destroy(); | |
125 | ctx->complete(0); | |
126 | }); | |
127 | stop_image_replayer(image_replayer, ctx); | |
128 | } | |
129 | gather_ctx->activate(); | |
130 | } | |
131 | ||
132 | template <typename I> | |
31f18b77 FG |
133 | void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher, |
134 | const std::string &global_image_id, | |
7c673cae | 135 | Context *on_finish) { |
d2e6a577 | 136 | dout(20) << "global_image_id=" << global_image_id << dendl; |
7c673cae FG |
137 | |
138 | Mutex::Locker locker(m_lock); | |
139 | ||
140 | assert(m_on_shut_down == nullptr); | |
141 | ||
142 | auto it = m_image_replayers.find(global_image_id); | |
7c673cae FG |
143 | if (it == m_image_replayers.end()) { |
144 | auto image_replayer = ImageReplayer<I>::create( | |
31f18b77 FG |
145 | m_threads, m_image_deleter, instance_watcher, m_local_rados, |
146 | m_local_mirror_uuid, m_local_pool_id, global_image_id); | |
7c673cae FG |
147 | |
148 | dout(20) << global_image_id << ": creating replayer " << image_replayer | |
149 | << dendl; | |
150 | ||
151 | it = m_image_replayers.insert(std::make_pair(global_image_id, | |
152 | image_replayer)).first; | |
d2e6a577 FG |
153 | |
154 | // TODO only a single peer is currently supported | |
155 | assert(m_peers.size() == 1); | |
156 | auto peer = *m_peers.begin(); | |
157 | image_replayer->add_peer(peer.peer_uuid, peer.io_ctx); | |
7c673cae FG |
158 | } |
159 | ||
d2e6a577 FG |
160 | auto& image_replayer = it->second; |
161 | // TODO temporary until policy integrated | |
162 | image_replayer->set_finished(false); | |
7c673cae | 163 | |
7c673cae | 164 | start_image_replayer(image_replayer); |
7c673cae FG |
165 | m_threads->work_queue->queue(on_finish, 0); |
166 | } | |
167 | ||
168 | template <typename I> | |
169 | void InstanceReplayer<I>::release_image(const std::string &global_image_id, | |
7c673cae | 170 | Context *on_finish) { |
d2e6a577 | 171 | dout(20) << "global_image_id=" << global_image_id << dendl; |
7c673cae FG |
172 | |
173 | Mutex::Locker locker(m_lock); | |
7c673cae FG |
174 | assert(m_on_shut_down == nullptr); |
175 | ||
176 | auto it = m_image_replayers.find(global_image_id); | |
7c673cae FG |
177 | if (it == m_image_replayers.end()) { |
178 | dout(20) << global_image_id << ": not found" << dendl; | |
179 | m_threads->work_queue->queue(on_finish, 0); | |
180 | return; | |
181 | } | |
182 | ||
183 | auto image_replayer = it->second; | |
7c673cae FG |
184 | m_image_replayers.erase(it); |
185 | ||
186 | on_finish = new FunctionContext( | |
187 | [image_replayer, on_finish] (int r) { | |
188 | image_replayer->destroy(); | |
189 | on_finish->complete(0); | |
190 | }); | |
d2e6a577 FG |
191 | stop_image_replayer(image_replayer, on_finish); |
192 | } | |
7c673cae | 193 | |
d2e6a577 FG |
194 | template <typename I> |
195 | void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id, | |
196 | const std::string &peer_mirror_uuid, | |
197 | Context *on_finish) { | |
198 | dout(20) << "global_image_id=" << global_image_id << ", " | |
199 | << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; | |
7c673cae | 200 | |
d2e6a577 FG |
201 | Mutex::Locker locker(m_lock); |
202 | assert(m_on_shut_down == nullptr); | |
203 | ||
204 | auto it = m_image_replayers.find(global_image_id); | |
205 | if (it != m_image_replayers.end()) { | |
206 | // TODO only a single peer is currently supported, therefore | |
207 | // we can just interrupt the current image replayer and | |
208 | // it will eventually detect that the peer image is missing and | |
209 | // determine if a delete propagation is required. | |
210 | auto image_replayer = it->second; | |
211 | image_replayer->restart(); | |
212 | } | |
213 | m_threads->work_queue->queue(on_finish, 0); | |
7c673cae FG |
214 | } |
215 | ||
216 | template <typename I> | |
217 | void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) { | |
218 | dout(20) << dendl; | |
219 | ||
220 | if (!f) { | |
221 | return; | |
222 | } | |
223 | ||
224 | Mutex::Locker locker(m_lock); | |
225 | ||
226 | f->open_array_section("image_replayers"); | |
227 | for (auto &kv : m_image_replayers) { | |
228 | auto &image_replayer = kv.second; | |
229 | image_replayer->print_status(f, ss); | |
230 | } | |
231 | f->close_section(); | |
232 | } | |
233 | ||
234 | template <typename I> | |
235 | void InstanceReplayer<I>::start() | |
236 | { | |
237 | dout(20) << dendl; | |
238 | ||
239 | Mutex::Locker locker(m_lock); | |
240 | ||
241 | m_manual_stop = false; | |
242 | ||
243 | for (auto &kv : m_image_replayers) { | |
244 | auto &image_replayer = kv.second; | |
245 | image_replayer->start(nullptr, true); | |
246 | } | |
247 | } | |
248 | ||
249 | template <typename I> | |
250 | void InstanceReplayer<I>::stop() | |
251 | { | |
252 | dout(20) << dendl; | |
253 | ||
254 | Mutex::Locker locker(m_lock); | |
255 | ||
256 | m_manual_stop = true; | |
257 | ||
258 | for (auto &kv : m_image_replayers) { | |
259 | auto &image_replayer = kv.second; | |
260 | image_replayer->stop(nullptr, true); | |
261 | } | |
262 | } | |
263 | ||
264 | template <typename I> | |
265 | void InstanceReplayer<I>::restart() | |
266 | { | |
267 | dout(20) << dendl; | |
268 | ||
269 | Mutex::Locker locker(m_lock); | |
270 | ||
271 | m_manual_stop = false; | |
272 | ||
273 | for (auto &kv : m_image_replayers) { | |
274 | auto &image_replayer = kv.second; | |
275 | image_replayer->restart(); | |
276 | } | |
277 | } | |
278 | ||
279 | template <typename I> | |
280 | void InstanceReplayer<I>::flush() | |
281 | { | |
282 | dout(20) << "enter" << dendl; | |
283 | ||
284 | Mutex::Locker locker(m_lock); | |
285 | ||
286 | for (auto &kv : m_image_replayers) { | |
287 | auto &image_replayer = kv.second; | |
288 | image_replayer->flush(); | |
289 | } | |
290 | } | |
291 | ||
292 | template <typename I> | |
293 | void InstanceReplayer<I>::start_image_replayer( | |
294 | ImageReplayer<I> *image_replayer) { | |
295 | assert(m_lock.is_locked()); | |
296 | ||
297 | std::string global_image_id = image_replayer->get_global_image_id(); | |
298 | dout(20) << "global_image_id=" << global_image_id << dendl; | |
299 | ||
300 | if (!image_replayer->is_stopped()) { | |
301 | return; | |
302 | } else if (image_replayer->is_blacklisted()) { | |
303 | derr << "blacklisted detected during image replay" << dendl; | |
304 | return; | |
d2e6a577 FG |
305 | } else if (image_replayer->is_finished()) { |
306 | // TODO temporary until policy integrated | |
307 | dout(5) << "removing image replayer for global_image_id=" | |
308 | << global_image_id << dendl; | |
309 | m_image_replayers.erase(image_replayer->get_global_image_id()); | |
310 | image_replayer->destroy(); | |
311 | return; | |
7c673cae FG |
312 | } |
313 | ||
d2e6a577 | 314 | image_replayer->start(nullptr, false); |
7c673cae FG |
315 | } |
316 | ||
317 | template <typename I> | |
c07f9fc5 | 318 | void InstanceReplayer<I>::queue_start_image_replayers() { |
7c673cae FG |
319 | dout(20) << dendl; |
320 | ||
c07f9fc5 FG |
321 | Context *ctx = create_context_callback< |
322 | InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this); | |
7c673cae FG |
323 | m_async_op_tracker.start_op(); |
324 | m_threads->work_queue->queue(ctx, 0); | |
325 | } | |
326 | ||
c07f9fc5 FG |
327 | template <typename I> |
328 | void InstanceReplayer<I>::start_image_replayers(int r) { | |
329 | dout(20) << dendl; | |
330 | ||
331 | Mutex::Locker locker(m_lock); | |
332 | if (m_on_shut_down != nullptr) { | |
333 | return; | |
334 | } | |
335 | ||
d2e6a577 FG |
336 | uint64_t image_count = 0; |
337 | uint64_t warning_count = 0; | |
338 | uint64_t error_count = 0; | |
339 | for (auto it = m_image_replayers.begin(); | |
340 | it != m_image_replayers.end();) { | |
341 | auto current_it(it); | |
342 | ++it; | |
343 | ||
c07f9fc5 | 344 | ++image_count; |
d2e6a577 | 345 | auto health_state = current_it->second->get_health_state(); |
c07f9fc5 FG |
346 | if (health_state == image_replayer::HEALTH_STATE_WARNING) { |
347 | ++warning_count; | |
348 | } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { | |
349 | ++error_count; | |
350 | } | |
351 | ||
d2e6a577 | 352 | start_image_replayer(current_it->second); |
c07f9fc5 FG |
353 | } |
354 | ||
355 | m_service_daemon->add_or_update_attribute( | |
356 | m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); | |
357 | m_service_daemon->add_or_update_attribute( | |
358 | m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); | |
359 | m_service_daemon->add_or_update_attribute( | |
360 | m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); | |
361 | ||
362 | m_async_op_tracker.finish_op(); | |
363 | } | |
7c673cae FG |
364 | |
365 | template <typename I> | |
366 | void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer, | |
367 | Context *on_finish) { | |
368 | dout(20) << image_replayer << " global_image_id=" | |
369 | << image_replayer->get_global_image_id() << ", on_finish=" | |
370 | << on_finish << dendl; | |
371 | ||
372 | if (image_replayer->is_stopped()) { | |
373 | m_threads->work_queue->queue(on_finish, 0); | |
374 | return; | |
375 | } | |
376 | ||
377 | m_async_op_tracker.start_op(); | |
378 | Context *ctx = create_async_context_callback( | |
379 | m_threads->work_queue, new FunctionContext( | |
380 | [this, image_replayer, on_finish] (int r) { | |
381 | stop_image_replayer(image_replayer, on_finish); | |
382 | m_async_op_tracker.finish_op(); | |
383 | })); | |
384 | ||
385 | if (image_replayer->is_running()) { | |
386 | image_replayer->stop(ctx, false); | |
387 | } else { | |
388 | int after = 1; | |
389 | dout(20) << "scheduling image replayer " << image_replayer << " stop after " | |
390 | << after << " sec (task " << ctx << ")" << dendl; | |
391 | ctx = new FunctionContext( | |
392 | [this, after, ctx] (int r) { | |
393 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
394 | m_threads->timer->add_event_after(after, ctx); | |
395 | }); | |
396 | m_threads->work_queue->queue(ctx, 0); | |
397 | } | |
398 | } | |
399 | ||
400 | template <typename I> | |
401 | void InstanceReplayer<I>::wait_for_ops() { | |
402 | dout(20) << dendl; | |
403 | ||
404 | Context *ctx = create_context_callback< | |
405 | InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this); | |
406 | ||
407 | m_async_op_tracker.wait_for_ops(ctx); | |
408 | } | |
409 | ||
410 | template <typename I> | |
411 | void InstanceReplayer<I>::handle_wait_for_ops(int r) { | |
412 | dout(20) << "r=" << r << dendl; | |
413 | ||
414 | assert(r == 0); | |
415 | ||
416 | Mutex::Locker locker(m_lock); | |
417 | stop_image_replayers(); | |
418 | } | |
419 | ||
420 | template <typename I> | |
421 | void InstanceReplayer<I>::stop_image_replayers() { | |
422 | dout(20) << dendl; | |
423 | ||
424 | assert(m_lock.is_locked()); | |
425 | ||
426 | Context *ctx = create_async_context_callback( | |
427 | m_threads->work_queue, create_context_callback<InstanceReplayer<I>, | |
428 | &InstanceReplayer<I>::handle_stop_image_replayers>(this)); | |
429 | ||
430 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); | |
431 | for (auto &it : m_image_replayers) { | |
432 | stop_image_replayer(it.second, gather_ctx->new_sub()); | |
433 | } | |
434 | gather_ctx->activate(); | |
435 | } | |
436 | ||
437 | template <typename I> | |
438 | void InstanceReplayer<I>::handle_stop_image_replayers(int r) { | |
439 | dout(20) << "r=" << r << dendl; | |
440 | ||
441 | assert(r == 0); | |
442 | ||
443 | Context *on_finish = nullptr; | |
444 | { | |
445 | Mutex::Locker locker(m_lock); | |
446 | ||
447 | for (auto &it : m_image_replayers) { | |
448 | assert(it.second->is_stopped()); | |
449 | it.second->destroy(); | |
450 | } | |
451 | m_image_replayers.clear(); | |
452 | ||
453 | assert(m_on_shut_down != nullptr); | |
454 | std::swap(on_finish, m_on_shut_down); | |
455 | } | |
456 | on_finish->complete(r); | |
457 | } | |
458 | ||
459 | template <typename I> | |
460 | void InstanceReplayer<I>::cancel_image_state_check_task() { | |
461 | Mutex::Locker timer_locker(m_threads->timer_lock); | |
462 | ||
463 | if (m_image_state_check_task == nullptr) { | |
464 | return; | |
465 | } | |
466 | ||
467 | dout(20) << m_image_state_check_task << dendl; | |
468 | bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); | |
469 | assert(canceled); | |
470 | m_image_state_check_task = nullptr; | |
471 | } | |
472 | ||
473 | template <typename I> | |
474 | void InstanceReplayer<I>::schedule_image_state_check_task() { | |
475 | assert(m_threads->timer_lock.is_locked()); | |
476 | assert(m_image_state_check_task == nullptr); | |
477 | ||
478 | m_image_state_check_task = new FunctionContext( | |
479 | [this](int r) { | |
480 | assert(m_threads->timer_lock.is_locked()); | |
481 | m_image_state_check_task = nullptr; | |
482 | schedule_image_state_check_task(); | |
c07f9fc5 | 483 | queue_start_image_replayers(); |
7c673cae FG |
484 | }); |
485 | ||
181888fb FG |
486 | int after = g_ceph_context->_conf->get_val<int64_t>( |
487 | "rbd_mirror_image_state_check_interval"); | |
7c673cae FG |
488 | |
489 | dout(20) << "scheduling image state check after " << after << " sec (task " | |
490 | << m_image_state_check_task << ")" << dendl; | |
491 | m_threads->timer->add_event_after(after, m_image_state_check_task); | |
492 | } | |
493 | ||
494 | } // namespace mirror | |
495 | } // namespace rbd | |
496 | ||
497 | template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>; |