]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "NamespaceReplayer.h" | |
9f95a23c TL |
5 | #include "common/Formatter.h" |
6 | #include "common/debug.h" | |
7 | #include "common/errno.h" | |
8 | #include "cls/rbd/cls_rbd_client.h" | |
9 | #include "librbd/Utils.h" | |
10 | #include "librbd/api/Config.h" | |
11 | #include "librbd/api/Mirror.h" | |
f67539c2 | 12 | #include "librbd/asio/ContextWQ.h" |
9f95a23c TL |
13 | #include "ServiceDaemon.h" |
14 | #include "Threads.h" | |
15 | ||
16 | #define dout_context g_ceph_context | |
17 | #define dout_subsys ceph_subsys_rbd_mirror | |
18 | #undef dout_prefix | |
19 | #define dout_prefix *_dout << "rbd::mirror::NamespaceReplayer: " \ | |
20 | << this << " " << __func__ << ": " | |
21 | ||
22 | using librbd::util::create_async_context_callback; | |
23 | using librbd::util::create_context_callback; | |
24 | ||
25 | namespace rbd { | |
26 | namespace mirror { | |
27 | ||
28 | using ::operator<<; | |
29 | ||
30 | namespace { | |
31 | ||
32 | const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count"); | |
33 | const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count"); | |
34 | ||
35 | } // anonymous namespace | |
36 | ||
37 | template <typename I> | |
38 | NamespaceReplayer<I>::NamespaceReplayer( | |
39 | const std::string &name, | |
40 | librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, | |
41 | const std::string &local_mirror_uuid, | |
42 | const std::string& local_mirror_peer_uuid, | |
43 | const RemotePoolMeta& remote_pool_meta, | |
44 | Threads<I> *threads, | |
45 | Throttler<I> *image_sync_throttler, | |
46 | Throttler<I> *image_deletion_throttler, | |
47 | ServiceDaemon<I> *service_daemon, | |
48 | journal::CacheManagerHandler *cache_manager_handler, | |
49 | PoolMetaCache* pool_meta_cache) : | |
50 | m_namespace_name(name), | |
51 | m_local_mirror_uuid(local_mirror_uuid), | |
52 | m_local_mirror_peer_uuid(local_mirror_peer_uuid), | |
53 | m_remote_pool_meta(remote_pool_meta), | |
54 | m_threads(threads), m_image_sync_throttler(image_sync_throttler), | |
55 | m_image_deletion_throttler(image_deletion_throttler), | |
56 | m_service_daemon(service_daemon), | |
57 | m_cache_manager_handler(cache_manager_handler), | |
58 | m_pool_meta_cache(pool_meta_cache), | |
59 | m_lock(ceph::make_mutex(librbd::util::unique_lock_name( | |
60 | "rbd::mirror::NamespaceReplayer " + name, this))), | |
61 | m_local_pool_watcher_listener(this, true), | |
62 | m_remote_pool_watcher_listener(this, false), | |
63 | m_image_map_listener(this) { | |
64 | dout(10) << name << dendl; | |
65 | ||
66 | m_local_io_ctx.dup(local_io_ctx); | |
67 | m_local_io_ctx.set_namespace(name); | |
68 | m_remote_io_ctx.dup(remote_io_ctx); | |
69 | m_remote_io_ctx.set_namespace(name); | |
70 | } | |
71 | ||
72 | template <typename I> | |
f67539c2 | 73 | bool NamespaceReplayer<I>::is_blocklisted() const { |
9f95a23c | 74 | std::lock_guard locker{m_lock}; |
f67539c2 | 75 | return m_instance_replayer->is_blocklisted() || |
9f95a23c | 76 | (m_local_pool_watcher && |
f67539c2 | 77 | m_local_pool_watcher->is_blocklisted()) || |
9f95a23c | 78 | (m_remote_pool_watcher && |
f67539c2 | 79 | m_remote_pool_watcher->is_blocklisted()); |
9f95a23c TL |
80 | } |
81 | ||
82 | template <typename I> | |
83 | void NamespaceReplayer<I>::init(Context *on_finish) { | |
84 | dout(20) << dendl; | |
85 | ||
86 | std::lock_guard locker{m_lock}; | |
87 | ||
88 | ceph_assert(m_on_finish == nullptr); | |
89 | m_on_finish = on_finish; | |
90 | ||
91 | init_local_status_updater(); | |
92 | } | |
93 | ||
94 | ||
95 | template <typename I> | |
96 | void NamespaceReplayer<I>::shut_down(Context *on_finish) { | |
97 | dout(20) << dendl; | |
98 | ||
99 | { | |
100 | std::lock_guard locker{m_lock}; | |
101 | ||
102 | ceph_assert(m_on_finish == nullptr); | |
103 | m_on_finish = on_finish; | |
104 | ||
105 | if (!m_image_map) { | |
106 | stop_instance_replayer(); | |
107 | return; | |
108 | } | |
109 | } | |
110 | ||
111 | auto ctx = new LambdaContext( | |
112 | [this] (int r) { | |
113 | std::lock_guard locker{m_lock}; | |
114 | stop_instance_replayer(); | |
115 | }); | |
116 | handle_release_leader(ctx); | |
117 | } | |
118 | ||
119 | template <typename I> | |
120 | void NamespaceReplayer<I>::print_status(Formatter *f) | |
121 | { | |
122 | dout(20) << dendl; | |
123 | ||
124 | ceph_assert(f); | |
125 | ||
126 | std::lock_guard locker{m_lock}; | |
127 | ||
128 | m_instance_replayer->print_status(f); | |
129 | ||
130 | if (m_image_deleter) { | |
131 | f->open_object_section("image_deleter"); | |
132 | m_image_deleter->print_status(f); | |
133 | f->close_section(); | |
134 | } | |
135 | } | |
136 | ||
137 | template <typename I> | |
138 | void NamespaceReplayer<I>::start() | |
139 | { | |
140 | dout(20) << dendl; | |
141 | ||
142 | std::lock_guard locker{m_lock}; | |
143 | ||
144 | m_instance_replayer->start(); | |
145 | } | |
146 | ||
147 | template <typename I> | |
148 | void NamespaceReplayer<I>::stop() | |
149 | { | |
150 | dout(20) << dendl; | |
151 | ||
152 | std::lock_guard locker{m_lock}; | |
153 | ||
154 | m_instance_replayer->stop(); | |
155 | } | |
156 | ||
157 | template <typename I> | |
158 | void NamespaceReplayer<I>::restart() | |
159 | { | |
160 | dout(20) << dendl; | |
161 | ||
162 | std::lock_guard locker{m_lock}; | |
163 | ||
164 | m_instance_replayer->restart(); | |
165 | } | |
166 | ||
167 | template <typename I> | |
168 | void NamespaceReplayer<I>::flush() | |
169 | { | |
170 | dout(20) << dendl; | |
171 | ||
172 | std::lock_guard locker{m_lock}; | |
173 | ||
174 | m_instance_replayer->flush(); | |
175 | } | |
176 | ||
177 | template <typename I> | |
178 | void NamespaceReplayer<I>::handle_update(const std::string &mirror_uuid, | |
179 | ImageIds &&added_image_ids, | |
180 | ImageIds &&removed_image_ids) { | |
181 | std::lock_guard locker{m_lock}; | |
182 | ||
183 | if (!m_image_map) { | |
184 | dout(20) << "not leader" << dendl; | |
185 | return; | |
186 | } | |
187 | ||
188 | dout(10) << "mirror_uuid=" << mirror_uuid << ", " | |
189 | << "added_count=" << added_image_ids.size() << ", " | |
190 | << "removed_count=" << removed_image_ids.size() << dendl; | |
191 | ||
192 | m_service_daemon->add_or_update_namespace_attribute( | |
193 | m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), | |
194 | SERVICE_DAEMON_LOCAL_COUNT_KEY, m_local_pool_watcher->get_image_count()); | |
195 | if (m_remote_pool_watcher) { | |
196 | m_service_daemon->add_or_update_namespace_attribute( | |
197 | m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(), | |
198 | SERVICE_DAEMON_REMOTE_COUNT_KEY, | |
199 | m_remote_pool_watcher->get_image_count()); | |
200 | } | |
201 | ||
202 | std::set<std::string> added_global_image_ids; | |
203 | for (auto& image_id : added_image_ids) { | |
204 | added_global_image_ids.insert(image_id.global_id); | |
205 | } | |
206 | ||
207 | std::set<std::string> removed_global_image_ids; | |
208 | for (auto& image_id : removed_image_ids) { | |
209 | removed_global_image_ids.insert(image_id.global_id); | |
210 | } | |
211 | ||
212 | m_image_map->update_images(mirror_uuid, | |
213 | std::move(added_global_image_ids), | |
214 | std::move(removed_global_image_ids)); | |
215 | } | |
216 | ||
217 | template <typename I> | |
218 | void NamespaceReplayer<I>::handle_acquire_leader(Context *on_finish) { | |
219 | dout(10) << dendl; | |
220 | ||
221 | m_instance_watcher->handle_acquire_leader(); | |
222 | ||
223 | init_image_map(on_finish); | |
224 | } | |
225 | ||
226 | template <typename I> | |
227 | void NamespaceReplayer<I>::handle_release_leader(Context *on_finish) { | |
228 | dout(10) << dendl; | |
229 | ||
230 | m_instance_watcher->handle_release_leader(); | |
231 | shut_down_image_deleter(on_finish); | |
232 | } | |
233 | ||
234 | template <typename I> | |
235 | void NamespaceReplayer<I>::handle_update_leader( | |
236 | const std::string &leader_instance_id) { | |
237 | dout(10) << "leader_instance_id=" << leader_instance_id << dendl; | |
238 | ||
239 | m_instance_watcher->handle_update_leader(leader_instance_id); | |
240 | } | |
241 | ||
242 | template <typename I> | |
243 | void NamespaceReplayer<I>::handle_instances_added( | |
244 | const std::vector<std::string> &instance_ids) { | |
245 | dout(10) << "instance_ids=" << instance_ids << dendl; | |
246 | ||
247 | std::lock_guard locker{m_lock}; | |
248 | ||
249 | if (!m_image_map) { | |
250 | return; | |
251 | } | |
252 | ||
253 | m_image_map->update_instances_added(instance_ids); | |
254 | } | |
255 | ||
256 | template <typename I> | |
257 | void NamespaceReplayer<I>::handle_instances_removed( | |
258 | const std::vector<std::string> &instance_ids) { | |
259 | dout(10) << "instance_ids=" << instance_ids << dendl; | |
260 | ||
261 | std::lock_guard locker{m_lock}; | |
262 | ||
263 | if (!m_image_map) { | |
264 | return; | |
265 | } | |
266 | ||
267 | m_image_map->update_instances_removed(instance_ids); | |
268 | } | |
269 | ||
270 | template <typename I> | |
271 | void NamespaceReplayer<I>::init_local_status_updater() { | |
272 | dout(10) << dendl; | |
273 | ||
274 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
275 | ceph_assert(!m_local_status_updater); | |
276 | ||
277 | m_local_status_updater.reset(MirrorStatusUpdater<I>::create( | |
278 | m_local_io_ctx, m_threads, "")); | |
279 | auto ctx = create_context_callback< | |
280 | NamespaceReplayer<I>, | |
281 | &NamespaceReplayer<I>::handle_init_local_status_updater>(this); | |
282 | ||
283 | m_local_status_updater->init(ctx); | |
284 | } | |
285 | ||
286 | template <typename I> | |
287 | void NamespaceReplayer<I>::handle_init_local_status_updater(int r) { | |
288 | dout(10) << "r=" << r << dendl; | |
289 | ||
290 | std::lock_guard locker{m_lock}; | |
291 | ||
292 | if (r < 0) { | |
293 | derr << "error initializing local mirror status updater: " | |
294 | << cpp_strerror(r) << dendl; | |
295 | ||
296 | m_local_status_updater.reset(); | |
297 | ceph_assert(m_on_finish != nullptr); | |
298 | m_threads->work_queue->queue(m_on_finish, r); | |
299 | m_on_finish = nullptr; | |
300 | return; | |
301 | } | |
302 | ||
303 | init_remote_status_updater(); | |
304 | } | |
305 | ||
306 | template <typename I> | |
307 | void NamespaceReplayer<I>::init_remote_status_updater() { | |
308 | dout(10) << dendl; | |
309 | ||
310 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
311 | ceph_assert(!m_remote_status_updater); | |
312 | ||
313 | m_remote_status_updater.reset(MirrorStatusUpdater<I>::create( | |
314 | m_remote_io_ctx, m_threads, m_local_mirror_uuid)); | |
315 | auto ctx = create_context_callback< | |
316 | NamespaceReplayer<I>, | |
317 | &NamespaceReplayer<I>::handle_init_remote_status_updater>(this); | |
318 | m_remote_status_updater->init(ctx); | |
319 | } | |
320 | ||
321 | template <typename I> | |
322 | void NamespaceReplayer<I>::handle_init_remote_status_updater(int r) { | |
323 | dout(10) << "r=" << r << dendl; | |
324 | ||
325 | std::lock_guard locker{m_lock}; | |
326 | ||
327 | if (r < 0) { | |
328 | derr << "error initializing remote mirror status updater: " | |
329 | << cpp_strerror(r) << dendl; | |
330 | ||
331 | m_remote_status_updater.reset(); | |
332 | m_ret_val = r; | |
333 | shut_down_local_status_updater(); | |
334 | return; | |
335 | } | |
336 | ||
337 | init_instance_replayer(); | |
338 | } | |
339 | ||
340 | template <typename I> | |
341 | void NamespaceReplayer<I>::init_instance_replayer() { | |
342 | dout(10) << dendl; | |
343 | ||
344 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
345 | ceph_assert(!m_instance_replayer); | |
346 | ||
347 | m_instance_replayer.reset(InstanceReplayer<I>::create( | |
348 | m_local_io_ctx, m_local_mirror_uuid, m_threads, m_service_daemon, | |
349 | m_local_status_updater.get(), m_cache_manager_handler, | |
350 | m_pool_meta_cache)); | |
351 | auto ctx = create_context_callback<NamespaceReplayer<I>, | |
352 | &NamespaceReplayer<I>::handle_init_instance_replayer>(this); | |
353 | ||
354 | m_instance_replayer->init(ctx); | |
355 | } | |
356 | ||
357 | template <typename I> | |
358 | void NamespaceReplayer<I>::handle_init_instance_replayer(int r) { | |
359 | dout(10) << "r=" << r << dendl; | |
360 | ||
361 | std::lock_guard locker{m_lock}; | |
362 | ||
363 | if (r < 0) { | |
364 | derr << "error initializing instance replayer: " << cpp_strerror(r) | |
365 | << dendl; | |
366 | ||
367 | m_instance_replayer.reset(); | |
368 | m_ret_val = r; | |
369 | shut_down_remote_status_updater(); | |
370 | return; | |
371 | } | |
372 | ||
373 | m_instance_replayer->add_peer({m_local_mirror_peer_uuid, m_remote_io_ctx, | |
374 | m_remote_pool_meta, | |
375 | m_remote_status_updater.get()}); | |
376 | ||
377 | init_instance_watcher(); | |
378 | } | |
379 | ||
380 | template <typename I> | |
381 | void NamespaceReplayer<I>::init_instance_watcher() { | |
382 | dout(10) << dendl; | |
383 | ||
384 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
385 | ceph_assert(!m_instance_watcher); | |
386 | ||
387 | m_instance_watcher.reset(InstanceWatcher<I>::create( | |
f67539c2 | 388 | m_local_io_ctx, *m_threads->asio_engine, m_instance_replayer.get(), |
9f95a23c TL |
389 | m_image_sync_throttler)); |
390 | auto ctx = create_context_callback<NamespaceReplayer<I>, | |
391 | &NamespaceReplayer<I>::handle_init_instance_watcher>(this); | |
392 | ||
393 | m_instance_watcher->init(ctx); | |
394 | } | |
395 | ||
396 | template <typename I> | |
397 | void NamespaceReplayer<I>::handle_init_instance_watcher(int r) { | |
398 | dout(10) << "r=" << r << dendl; | |
399 | ||
400 | std::lock_guard locker{m_lock}; | |
401 | ||
402 | if (r < 0) { | |
403 | derr << "error initializing instance watcher: " << cpp_strerror(r) | |
404 | << dendl; | |
405 | ||
406 | m_instance_watcher.reset(); | |
407 | m_ret_val = r; | |
408 | shut_down_instance_replayer(); | |
409 | return; | |
410 | } | |
411 | ||
412 | ceph_assert(m_on_finish != nullptr); | |
413 | m_threads->work_queue->queue(m_on_finish); | |
414 | m_on_finish = nullptr; | |
415 | } | |
416 | ||
417 | template <typename I> | |
418 | void NamespaceReplayer<I>::stop_instance_replayer() { | |
419 | dout(10) << dendl; | |
420 | ||
421 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
422 | ||
423 | Context *ctx = create_async_context_callback( | |
424 | m_threads->work_queue, create_context_callback<NamespaceReplayer<I>, | |
425 | &NamespaceReplayer<I>::handle_stop_instance_replayer>(this)); | |
426 | ||
427 | m_instance_replayer->stop(ctx); | |
428 | } | |
429 | ||
430 | template <typename I> | |
431 | void NamespaceReplayer<I>::handle_stop_instance_replayer(int r) { | |
432 | dout(10) << "r=" << r << dendl; | |
433 | ||
434 | if (r < 0) { | |
435 | derr << "error stopping instance replayer: " << cpp_strerror(r) << dendl; | |
436 | } | |
437 | ||
438 | std::lock_guard locker{m_lock}; | |
439 | ||
440 | shut_down_instance_watcher(); | |
441 | } | |
442 | ||
443 | template <typename I> | |
444 | void NamespaceReplayer<I>::shut_down_instance_watcher() { | |
445 | dout(10) << dendl; | |
446 | ||
447 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
448 | ceph_assert(m_instance_watcher); | |
449 | ||
450 | Context *ctx = create_async_context_callback( | |
451 | m_threads->work_queue, create_context_callback<NamespaceReplayer<I>, | |
452 | &NamespaceReplayer<I>::handle_shut_down_instance_watcher>(this)); | |
453 | ||
454 | m_instance_watcher->shut_down(ctx); | |
455 | } | |
456 | ||
457 | template <typename I> | |
458 | void NamespaceReplayer<I>::handle_shut_down_instance_watcher(int r) { | |
459 | dout(10) << "r=" << r << dendl; | |
460 | ||
461 | if (r < 0) { | |
462 | derr << "error shutting instance watcher down: " << cpp_strerror(r) | |
463 | << dendl; | |
464 | } | |
465 | ||
466 | std::lock_guard locker{m_lock}; | |
467 | ||
468 | m_instance_watcher.reset(); | |
469 | ||
470 | shut_down_instance_replayer(); | |
471 | } | |
472 | ||
473 | template <typename I> | |
474 | void NamespaceReplayer<I>::shut_down_instance_replayer() { | |
475 | dout(10) << dendl; | |
476 | ||
477 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
478 | ceph_assert(m_instance_replayer); | |
479 | ||
480 | Context *ctx = create_async_context_callback( | |
481 | m_threads->work_queue, create_context_callback<NamespaceReplayer<I>, | |
482 | &NamespaceReplayer<I>::handle_shut_down_instance_replayer>(this)); | |
483 | ||
484 | m_instance_replayer->shut_down(ctx); | |
485 | } | |
486 | ||
487 | template <typename I> | |
488 | void NamespaceReplayer<I>::handle_shut_down_instance_replayer(int r) { | |
489 | dout(10) << "r=" << r << dendl; | |
490 | ||
491 | if (r < 0) { | |
492 | derr << "error shutting instance replayer down: " << cpp_strerror(r) | |
493 | << dendl; | |
494 | } | |
495 | ||
496 | std::lock_guard locker{m_lock}; | |
497 | ||
498 | m_instance_replayer.reset(); | |
499 | ||
500 | shut_down_remote_status_updater(); | |
501 | } | |
502 | ||
503 | template <typename I> | |
504 | void NamespaceReplayer<I>::shut_down_remote_status_updater() { | |
505 | dout(10) << dendl; | |
506 | ||
507 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
508 | ceph_assert(m_remote_status_updater); | |
509 | ||
510 | auto ctx = create_async_context_callback( | |
511 | m_threads->work_queue, create_context_callback< | |
512 | NamespaceReplayer<I>, | |
513 | &NamespaceReplayer<I>::handle_shut_down_remote_status_updater>(this)); | |
514 | m_remote_status_updater->shut_down(ctx); | |
515 | } | |
516 | ||
517 | template <typename I> | |
518 | void NamespaceReplayer<I>::handle_shut_down_remote_status_updater(int r) { | |
519 | dout(10) << "r=" << r << dendl; | |
520 | ||
521 | if (r < 0) { | |
522 | derr << "error shutting remote mirror status updater down: " | |
523 | << cpp_strerror(r) << dendl; | |
524 | } | |
525 | ||
526 | std::lock_guard locker{m_lock}; | |
527 | m_remote_status_updater.reset(); | |
528 | ||
529 | shut_down_local_status_updater(); | |
530 | } | |
531 | ||
532 | template <typename I> | |
533 | void NamespaceReplayer<I>::shut_down_local_status_updater() { | |
534 | dout(10) << dendl; | |
535 | ||
536 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
537 | ceph_assert(m_local_status_updater); | |
538 | ||
539 | auto ctx = create_async_context_callback( | |
540 | m_threads->work_queue, create_context_callback< | |
541 | NamespaceReplayer<I>, | |
542 | &NamespaceReplayer<I>::handle_shut_down_local_status_updater>(this)); | |
543 | ||
544 | m_local_status_updater->shut_down(ctx); | |
545 | } | |
546 | ||
547 | template <typename I> | |
548 | void NamespaceReplayer<I>::handle_shut_down_local_status_updater(int r) { | |
549 | dout(10) << "r=" << r << dendl; | |
550 | ||
551 | if (r < 0) { | |
552 | derr << "error shutting local mirror status updater down: " | |
553 | << cpp_strerror(r) << dendl; | |
554 | } | |
555 | ||
556 | std::lock_guard locker{m_lock}; | |
557 | ||
558 | m_local_status_updater.reset(); | |
559 | ||
560 | ceph_assert(!m_image_map); | |
561 | ceph_assert(!m_image_deleter); | |
562 | ceph_assert(!m_local_pool_watcher); | |
563 | ceph_assert(!m_remote_pool_watcher); | |
564 | ceph_assert(!m_instance_watcher); | |
565 | ceph_assert(!m_instance_replayer); | |
566 | ||
567 | ceph_assert(m_on_finish != nullptr); | |
568 | m_threads->work_queue->queue(m_on_finish, m_ret_val); | |
569 | m_on_finish = nullptr; | |
570 | m_ret_val = 0; | |
571 | } | |
572 | ||
573 | template <typename I> | |
574 | void NamespaceReplayer<I>::init_image_map(Context *on_finish) { | |
575 | dout(10) << dendl; | |
576 | ||
577 | auto image_map = ImageMap<I>::create(m_local_io_ctx, m_threads, | |
578 | m_instance_watcher->get_instance_id(), | |
579 | m_image_map_listener); | |
580 | ||
581 | auto ctx = new LambdaContext( | |
582 | [this, image_map, on_finish](int r) { | |
583 | handle_init_image_map(r, image_map, on_finish); | |
584 | }); | |
585 | image_map->init(create_async_context_callback( | |
586 | m_threads->work_queue, ctx)); | |
587 | } | |
588 | ||
589 | template <typename I> | |
590 | void NamespaceReplayer<I>::handle_init_image_map(int r, ImageMap<I> *image_map, | |
591 | Context *on_finish) { | |
592 | dout(10) << "r=" << r << dendl; | |
593 | if (r < 0) { | |
594 | derr << "failed to init image map: " << cpp_strerror(r) << dendl; | |
595 | on_finish = new LambdaContext([image_map, on_finish, r](int) { | |
596 | delete image_map; | |
597 | on_finish->complete(r); | |
598 | }); | |
599 | image_map->shut_down(on_finish); | |
600 | return; | |
601 | } | |
602 | ||
603 | ceph_assert(!m_image_map); | |
604 | m_image_map.reset(image_map); | |
605 | ||
606 | init_local_pool_watcher(on_finish); | |
607 | } | |
608 | ||
609 | template <typename I> | |
610 | void NamespaceReplayer<I>::init_local_pool_watcher(Context *on_finish) { | |
611 | dout(10) << dendl; | |
612 | ||
613 | std::lock_guard locker{m_lock}; | |
614 | ceph_assert(!m_local_pool_watcher); | |
615 | m_local_pool_watcher.reset(PoolWatcher<I>::create( | |
616 | m_threads, m_local_io_ctx, m_local_mirror_uuid, | |
617 | m_local_pool_watcher_listener)); | |
618 | ||
619 | // ensure the initial set of local images is up-to-date | |
620 | // after acquiring the leader role | |
621 | auto ctx = new LambdaContext([this, on_finish](int r) { | |
622 | handle_init_local_pool_watcher(r, on_finish); | |
623 | }); | |
624 | m_local_pool_watcher->init(create_async_context_callback( | |
625 | m_threads->work_queue, ctx)); | |
626 | } | |
627 | ||
628 | template <typename I> | |
629 | void NamespaceReplayer<I>::handle_init_local_pool_watcher( | |
630 | int r, Context *on_finish) { | |
631 | dout(10) << "r=" << r << dendl; | |
632 | if (r < 0) { | |
633 | derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; | |
634 | on_finish = new LambdaContext([on_finish, r](int) { | |
635 | on_finish->complete(r); | |
636 | }); | |
637 | shut_down_pool_watchers(on_finish); | |
638 | return; | |
639 | } | |
640 | ||
641 | init_remote_pool_watcher(on_finish); | |
642 | } | |
643 | ||
644 | template <typename I> | |
645 | void NamespaceReplayer<I>::init_remote_pool_watcher(Context *on_finish) { | |
646 | dout(10) << dendl; | |
647 | ||
648 | std::lock_guard locker{m_lock}; | |
649 | ceph_assert(!m_remote_pool_watcher); | |
650 | m_remote_pool_watcher.reset(PoolWatcher<I>::create( | |
651 | m_threads, m_remote_io_ctx, m_remote_pool_meta.mirror_uuid, | |
652 | m_remote_pool_watcher_listener)); | |
653 | ||
654 | auto ctx = new LambdaContext([this, on_finish](int r) { | |
655 | handle_init_remote_pool_watcher(r, on_finish); | |
656 | }); | |
657 | m_remote_pool_watcher->init(create_async_context_callback( | |
658 | m_threads->work_queue, ctx)); | |
659 | } | |
660 | ||
661 | template <typename I> | |
662 | void NamespaceReplayer<I>::handle_init_remote_pool_watcher( | |
663 | int r, Context *on_finish) { | |
664 | dout(10) << "r=" << r << dendl; | |
665 | if (r == -ENOENT) { | |
666 | // Technically nothing to do since the other side doesn't | |
667 | // have mirroring enabled. Eventually the remote pool watcher will | |
668 | // detect images (if mirroring is enabled), so no point propagating | |
669 | // an error which would just busy-spin the state machines. | |
670 | dout(0) << "remote peer does not have mirroring configured" << dendl; | |
671 | } else if (r < 0) { | |
672 | derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl; | |
673 | on_finish = new LambdaContext([on_finish, r](int) { | |
674 | on_finish->complete(r); | |
675 | }); | |
676 | shut_down_pool_watchers(on_finish); | |
677 | return; | |
678 | } | |
679 | ||
680 | init_image_deleter(on_finish); | |
681 | } | |
682 | ||
683 | template <typename I> | |
684 | void NamespaceReplayer<I>::init_image_deleter(Context *on_finish) { | |
685 | dout(10) << dendl; | |
686 | ||
687 | std::lock_guard locker{m_lock}; | |
688 | ceph_assert(!m_image_deleter); | |
689 | ||
690 | on_finish = new LambdaContext([this, on_finish](int r) { | |
691 | handle_init_image_deleter(r, on_finish); | |
692 | }); | |
693 | m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads, | |
694 | m_image_deletion_throttler, | |
695 | m_service_daemon)); | |
696 | m_image_deleter->init(create_async_context_callback( | |
697 | m_threads->work_queue, on_finish)); | |
698 | } | |
699 | ||
700 | template <typename I> | |
701 | void NamespaceReplayer<I>::handle_init_image_deleter( | |
702 | int r, Context *on_finish) { | |
703 | dout(10) << "r=" << r << dendl; | |
704 | if (r < 0) { | |
705 | derr << "failed to init image deleter: " << cpp_strerror(r) << dendl; | |
706 | on_finish = new LambdaContext([on_finish, r](int) { | |
707 | on_finish->complete(r); | |
708 | }); | |
709 | shut_down_image_deleter(on_finish); | |
710 | return; | |
711 | } | |
712 | ||
713 | on_finish->complete(0); | |
714 | } | |
715 | ||
716 | template <typename I> | |
717 | void NamespaceReplayer<I>::shut_down_image_deleter(Context* on_finish) { | |
718 | dout(10) << dendl; | |
719 | { | |
720 | std::lock_guard locker{m_lock}; | |
721 | if (m_image_deleter) { | |
722 | Context *ctx = new LambdaContext([this, on_finish](int r) { | |
723 | handle_shut_down_image_deleter(r, on_finish); | |
724 | }); | |
725 | ctx = create_async_context_callback(m_threads->work_queue, ctx); | |
726 | ||
727 | m_image_deleter->shut_down(ctx); | |
728 | return; | |
729 | } | |
730 | } | |
731 | shut_down_pool_watchers(on_finish); | |
732 | } | |
733 | ||
734 | template <typename I> | |
735 | void NamespaceReplayer<I>::handle_shut_down_image_deleter( | |
736 | int r, Context* on_finish) { | |
737 | dout(10) << "r=" << r << dendl; | |
738 | ||
739 | { | |
740 | std::lock_guard locker{m_lock}; | |
741 | ceph_assert(m_image_deleter); | |
742 | m_image_deleter.reset(); | |
743 | } | |
744 | ||
745 | shut_down_pool_watchers(on_finish); | |
746 | } | |
747 | ||
748 | template <typename I> | |
749 | void NamespaceReplayer<I>::shut_down_pool_watchers(Context *on_finish) { | |
750 | dout(10) << dendl; | |
751 | ||
752 | { | |
753 | std::lock_guard locker{m_lock}; | |
754 | if (m_local_pool_watcher) { | |
755 | Context *ctx = new LambdaContext([this, on_finish](int r) { | |
756 | handle_shut_down_pool_watchers(r, on_finish); | |
757 | }); | |
758 | ctx = create_async_context_callback(m_threads->work_queue, ctx); | |
759 | ||
760 | auto gather_ctx = new C_Gather(g_ceph_context, ctx); | |
761 | m_local_pool_watcher->shut_down(gather_ctx->new_sub()); | |
762 | if (m_remote_pool_watcher) { | |
763 | m_remote_pool_watcher->shut_down(gather_ctx->new_sub()); | |
764 | } | |
765 | gather_ctx->activate(); | |
766 | return; | |
767 | } | |
768 | } | |
769 | ||
770 | on_finish->complete(0); | |
771 | } | |
772 | ||
773 | template <typename I> | |
774 | void NamespaceReplayer<I>::handle_shut_down_pool_watchers( | |
775 | int r, Context *on_finish) { | |
776 | dout(10) << "r=" << r << dendl; | |
777 | ||
778 | { | |
779 | std::lock_guard locker{m_lock}; | |
780 | ceph_assert(m_local_pool_watcher); | |
781 | m_local_pool_watcher.reset(); | |
782 | ||
783 | if (m_remote_pool_watcher) { | |
784 | m_remote_pool_watcher.reset(); | |
785 | } | |
786 | } | |
787 | shut_down_image_map(on_finish); | |
788 | } | |
789 | ||
790 | template <typename I> | |
791 | void NamespaceReplayer<I>::shut_down_image_map(Context *on_finish) { | |
792 | dout(5) << dendl; | |
793 | ||
794 | std::lock_guard locker{m_lock}; | |
795 | if (m_image_map) { | |
796 | on_finish = new LambdaContext( | |
797 | [this, on_finish](int r) { | |
798 | handle_shut_down_image_map(r, on_finish); | |
799 | }); | |
800 | m_image_map->shut_down(create_async_context_callback( | |
801 | m_threads->work_queue, on_finish)); | |
802 | return; | |
803 | } | |
804 | ||
805 | m_threads->work_queue->queue(on_finish); | |
806 | } | |
807 | ||
808 | template <typename I> | |
809 | void NamespaceReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) { | |
810 | dout(5) << "r=" << r << dendl; | |
f67539c2 | 811 | if (r < 0 && r != -EBLOCKLISTED) { |
9f95a23c TL |
812 | derr << "failed to shut down image map: " << cpp_strerror(r) << dendl; |
813 | } | |
814 | ||
815 | std::lock_guard locker{m_lock}; | |
816 | ceph_assert(m_image_map); | |
817 | m_image_map.reset(); | |
818 | ||
819 | m_instance_replayer->release_all(create_async_context_callback( | |
820 | m_threads->work_queue, on_finish)); | |
821 | } | |
822 | ||
823 | template <typename I> | |
824 | void NamespaceReplayer<I>::handle_acquire_image(const std::string &global_image_id, | |
825 | const std::string &instance_id, | |
826 | Context* on_finish) { | |
827 | dout(5) << "global_image_id=" << global_image_id << ", " | |
828 | << "instance_id=" << instance_id << dendl; | |
829 | ||
830 | m_instance_watcher->notify_image_acquire(instance_id, global_image_id, | |
831 | on_finish); | |
832 | } | |
833 | ||
834 | template <typename I> | |
835 | void NamespaceReplayer<I>::handle_release_image(const std::string &global_image_id, | |
836 | const std::string &instance_id, | |
837 | Context* on_finish) { | |
838 | dout(5) << "global_image_id=" << global_image_id << ", " | |
839 | << "instance_id=" << instance_id << dendl; | |
840 | ||
841 | m_instance_watcher->notify_image_release(instance_id, global_image_id, | |
842 | on_finish); | |
843 | } | |
844 | ||
845 | template <typename I> | |
846 | void NamespaceReplayer<I>::handle_remove_image(const std::string &mirror_uuid, | |
847 | const std::string &global_image_id, | |
848 | const std::string &instance_id, | |
849 | Context* on_finish) { | |
850 | ceph_assert(!mirror_uuid.empty()); | |
851 | dout(5) << "mirror_uuid=" << mirror_uuid << ", " | |
852 | << "global_image_id=" << global_image_id << ", " | |
853 | << "instance_id=" << instance_id << dendl; | |
854 | ||
855 | m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id, | |
856 | mirror_uuid, on_finish); | |
857 | } | |
858 | ||
859 | } // namespace mirror | |
860 | } // namespace rbd | |
861 | ||
862 | template class rbd::mirror::NamespaceReplayer<librbd::ImageCtx>; |