]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
index 40a8c1b43205a6f7d6e79043ff4437050ee9c619..52e60605c758eac5dffa87f4422052f7e902a4e8 100644 (file)
@@ -99,24 +99,15 @@ void InstanceReplayer<I>::shut_down(Context *on_finish) {
 }
 
 template <typename I>
-void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
+void InstanceReplayer<I>::add_peer(std::string peer_uuid,
                                    librados::IoCtx io_ctx) {
-  dout(20) << mirror_uuid << dendl;
+  dout(20) << peer_uuid << dendl;
 
   Mutex::Locker locker(m_lock);
-  auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
+  auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
   assert(result);
 }
 
-template <typename I>
-void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
-  dout(20) << mirror_uuid << dendl;
-
-  Mutex::Locker locker(m_lock);
-  auto result = m_peers.erase(Peer(mirror_uuid));
-  assert(result > 0);
-}
-
 template <typename I>
 void InstanceReplayer<I>::release_all(Context *on_finish) {
   dout(20) << dendl;
@@ -141,18 +132,14 @@ void InstanceReplayer<I>::release_all(Context *on_finish) {
 template <typename I>
 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
                                         const std::string &global_image_id,
-                                        const std::string &peer_mirror_uuid,
-                                        const std::string &peer_image_id,
                                         Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
-           << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+  dout(20) << "global_image_id=" << global_image_id << dendl;
 
   Mutex::Locker locker(m_lock);
 
   assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
-
   if (it == m_image_replayers.end()) {
     auto image_replayer = ImageReplayer<I>::create(
         m_threads, m_image_deleter, instance_watcher, m_local_rados,
@@ -163,36 +150,30 @@ void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
 
     it = m_image_replayers.insert(std::make_pair(global_image_id,
                                                  image_replayer)).first;
+
+    // TODO only a single peer is currently supported
+    assert(m_peers.size() == 1);
+    auto peer = *m_peers.begin();
+    image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
   }
 
-  auto image_replayer = it->second;
-  if (!peer_mirror_uuid.empty()) {
-    auto iter = m_peers.find(Peer(peer_mirror_uuid));
-    assert(iter != m_peers.end());
-    auto io_ctx = iter->io_ctx;
+  auto& image_replayer = it->second;
+  // TODO temporary until policy integrated
+  image_replayer->set_finished(false);
 
-    image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
-  }
   start_image_replayer(image_replayer);
-
   m_threads->work_queue->queue(on_finish, 0);
 }
 
 template <typename I>
 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
-                                        const std::string &peer_mirror_uuid,
-                                        const std::string &peer_image_id,
-                                        bool schedule_delete,
                                         Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
-           << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+  dout(20) << "global_image_id=" << global_image_id << dendl;
 
   Mutex::Locker locker(m_lock);
-
   assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
-
   if (it == m_image_replayers.end()) {
     dout(20) << global_image_id << ": not found" << dendl;
     m_threads->work_queue->queue(on_finish, 0);
@@ -200,17 +181,6 @@ void InstanceReplayer<I>::release_image(const std::string &global_image_id,
   }
 
   auto image_replayer = it->second;
-  if (!peer_mirror_uuid.empty()) {
-    image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
-                                       schedule_delete);
-  }
-
-  if (!image_replayer->remote_images_empty()) {
-    dout(20) << global_image_id << ": still has peer images" << dendl;
-    m_threads->work_queue->queue(on_finish, 0);
-    return;
-  }
-
   m_image_replayers.erase(it);
 
   on_finish = new FunctionContext(
@@ -218,18 +188,29 @@ void InstanceReplayer<I>::release_image(const std::string &global_image_id,
       image_replayer->destroy();
       on_finish->complete(0);
     });
+  stop_image_replayer(image_replayer, on_finish);
+}
 
-  if (schedule_delete) {
-    on_finish = new FunctionContext(
-      [this, image_replayer, on_finish] (int r) {
-        auto global_image_id = image_replayer->get_global_image_id();
-        m_image_deleter->schedule_image_delete(
-          m_local_rados, m_local_pool_id, global_image_id, false);
-        on_finish->complete(0);
-      });
-  }
+template <typename I>
+void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
+                                            const std::string &peer_mirror_uuid,
+                                            Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << ", "
+           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
 
-  stop_image_replayer(image_replayer, on_finish);
+  Mutex::Locker locker(m_lock);
+  assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+  if (it != m_image_replayers.end()) {
+    // TODO only a single peer is currently supported, therefore
+    // we can just interrupt the current image replayer and
+    // it will eventually detect that the peer image is missing and
+    // determine if a delete propagation is required.
+    auto image_replayer = it->second;
+    image_replayer->restart();
+  }
+  m_threads->work_queue->queue(on_finish, 0);
 }
 
 template <typename I>
@@ -321,36 +302,16 @@ void InstanceReplayer<I>::start_image_replayer(
   } else if (image_replayer->is_blacklisted()) {
     derr << "blacklisted detected during image replay" << dendl;
     return;
+  } else if (image_replayer->is_finished()) {
+    // TODO temporary until policy integrated
+    dout(5) << "removing image replayer for global_image_id="
+            << global_image_id << dendl;
+    m_image_replayers.erase(image_replayer->get_global_image_id());
+    image_replayer->destroy();
+    return;
   }
 
-  FunctionContext *ctx = new FunctionContext(
-    [this, global_image_id] (int r) {
-      dout(20) << "image deleter result: r=" << r << ", "
-               << "global_image_id=" << global_image_id << dendl;
-
-      Mutex::Locker locker(m_lock);
-      m_async_op_tracker.finish_op();
-
-      if (r == -ESTALE || r == -ECANCELED) {
-        return;
-      }
-
-      auto it = m_image_replayers.find(global_image_id);
-      if (it == m_image_replayers.end()) {
-        return;
-      }
-
-      auto image_replayer = it->second;
-      if (r >= 0) {
-        image_replayer->start(nullptr, false);
-      } else {
-        start_image_replayer(image_replayer);
-      }
-    });
-
-  m_async_op_tracker.start_op();
-  m_image_deleter->wait_for_scheduled_deletion(
-    m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
+  image_replayer->start(nullptr, false);
 }
 
 template <typename I>
@@ -372,19 +333,23 @@ void InstanceReplayer<I>::start_image_replayers(int r) {
     return;
   }
 
-  size_t image_count = 0;
-  size_t warning_count = 0;
-  size_t error_count = 0;
-  for (auto &it : m_image_replayers) {
+  uint64_t image_count = 0;
+  uint64_t warning_count = 0;
+  uint64_t error_count = 0;
+  for (auto it = m_image_replayers.begin();
+       it != m_image_replayers.end();) {
+    auto current_it(it);
+    ++it;
+
     ++image_count;
-    auto health_state = it.second->get_health_state();
+    auto health_state = current_it->second->get_health_state();
     if (health_state == image_replayer::HEALTH_STATE_WARNING) {
       ++warning_count;
     } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
       ++error_count;
     }
 
-    start_image_replayer(it.second);
+    start_image_replayer(current_it->second);
   }
 
   m_service_daemon->add_or_update_attribute(