}
template <typename I>
-void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
+void InstanceReplayer<I>::add_peer(std::string peer_uuid,
librados::IoCtx io_ctx) {
- dout(20) << mirror_uuid << dendl;
+ dout(20) << peer_uuid << dendl;
Mutex::Locker locker(m_lock);
- auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
+ auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
assert(result);
}
-template <typename I>
-void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
- dout(20) << mirror_uuid << dendl;
-
- Mutex::Locker locker(m_lock);
- auto result = m_peers.erase(Peer(mirror_uuid));
- assert(result > 0);
-}
-
template <typename I>
void InstanceReplayer<I>::release_all(Context *on_finish) {
dout(20) << dendl;
template <typename I>
void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
const std::string &global_image_id,
- const std::string &peer_mirror_uuid,
- const std::string &peer_image_id,
Context *on_finish) {
- dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
- << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+ dout(20) << "global_image_id=" << global_image_id << dendl;
Mutex::Locker locker(m_lock);
assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
-
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
m_threads, m_image_deleter, instance_watcher, m_local_rados,
it = m_image_replayers.insert(std::make_pair(global_image_id,
image_replayer)).first;
+
+ // TODO only a single peer is currently supported
+ assert(m_peers.size() == 1);
+ auto peer = *m_peers.begin();
+ image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
}
- auto image_replayer = it->second;
- if (!peer_mirror_uuid.empty()) {
- auto iter = m_peers.find(Peer(peer_mirror_uuid));
- assert(iter != m_peers.end());
- auto io_ctx = iter->io_ctx;
+ auto& image_replayer = it->second;
+ // TODO temporary until policy integrated
+ image_replayer->set_finished(false);
- image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
- }
start_image_replayer(image_replayer);
-
m_threads->work_queue->queue(on_finish, 0);
}
template <typename I>
void InstanceReplayer<I>::release_image(const std::string &global_image_id,
- const std::string &peer_mirror_uuid,
- const std::string &peer_image_id,
- bool schedule_delete,
Context *on_finish) {
- dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
- << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+ dout(20) << "global_image_id=" << global_image_id << dendl;
Mutex::Locker locker(m_lock);
-
assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
-
if (it == m_image_replayers.end()) {
dout(20) << global_image_id << ": not found" << dendl;
m_threads->work_queue->queue(on_finish, 0);
}
auto image_replayer = it->second;
- if (!peer_mirror_uuid.empty()) {
- image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
- schedule_delete);
- }
-
- if (!image_replayer->remote_images_empty()) {
- dout(20) << global_image_id << ": still has peer images" << dendl;
- m_threads->work_queue->queue(on_finish, 0);
- return;
- }
-
m_image_replayers.erase(it);
on_finish = new FunctionContext(
image_replayer->destroy();
on_finish->complete(0);
});
+ stop_image_replayer(image_replayer, on_finish);
+}
- if (schedule_delete) {
- on_finish = new FunctionContext(
- [this, image_replayer, on_finish] (int r) {
- auto global_image_id = image_replayer->get_global_image_id();
- m_image_deleter->schedule_image_delete(
- m_local_rados, m_local_pool_id, global_image_id, false);
- on_finish->complete(0);
- });
- }
+template <typename I>
+void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
- stop_image_replayer(image_replayer, on_finish);
+ Mutex::Locker locker(m_lock);
+ assert(m_on_shut_down == nullptr);
+
+ auto it = m_image_replayers.find(global_image_id);
+ if (it != m_image_replayers.end()) {
+ // TODO only a single peer is currently supported, therefore
+ // we can just interrupt the current image replayer and
+ // it will eventually detect that the peer image is missing and
+ // determine if a delete propagation is required.
+ auto image_replayer = it->second;
+ image_replayer->restart();
+ }
+ m_threads->work_queue->queue(on_finish, 0);
}
template <typename I>
} else if (image_replayer->is_blacklisted()) {
derr << "blacklisted detected during image replay" << dendl;
return;
+ } else if (image_replayer->is_finished()) {
+ // TODO temporary until policy integrated
+ dout(5) << "removing image replayer for global_image_id="
+ << global_image_id << dendl;
+ m_image_replayers.erase(image_replayer->get_global_image_id());
+ image_replayer->destroy();
+ return;
}
- FunctionContext *ctx = new FunctionContext(
- [this, global_image_id] (int r) {
- dout(20) << "image deleter result: r=" << r << ", "
- << "global_image_id=" << global_image_id << dendl;
-
- Mutex::Locker locker(m_lock);
- m_async_op_tracker.finish_op();
-
- if (r == -ESTALE || r == -ECANCELED) {
- return;
- }
-
- auto it = m_image_replayers.find(global_image_id);
- if (it == m_image_replayers.end()) {
- return;
- }
-
- auto image_replayer = it->second;
- if (r >= 0) {
- image_replayer->start(nullptr, false);
- } else {
- start_image_replayer(image_replayer);
- }
- });
-
- m_async_op_tracker.start_op();
- m_image_deleter->wait_for_scheduled_deletion(
- m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
+ image_replayer->start(nullptr, false);
}
template <typename I>
return;
}
- size_t image_count = 0;
- size_t warning_count = 0;
- size_t error_count = 0;
- for (auto &it : m_image_replayers) {
+ uint64_t image_count = 0;
+ uint64_t warning_count = 0;
+ uint64_t error_count = 0;
+ for (auto it = m_image_replayers.begin();
+ it != m_image_replayers.end();) {
+ auto current_it(it);
+ ++it;
+
++image_count;
- auto health_state = it.second->get_health_state();
+ auto health_state = current_it->second->get_health_state();
if (health_state == image_replayer::HEALTH_STATE_WARNING) {
++warning_count;
} else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
++error_count;
}
- start_image_replayer(it.second);
+ start_image_replayer(current_it->second);
}
m_service_daemon->add_or_update_attribute(