#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "json_spirit/json_spirit.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
+#include "librbd/Operations.h"
#include "librbd/Utils.h"
+#include "librbd/deep_copy/Handler.h"
#include "librbd/deep_copy/ImageCopyRequest.h"
#include "librbd/deep_copy/SnapshotCopyRequest.h"
#include "librbd/mirror/snapshot/CreateNonPrimaryRequest.h"
#include "tools/rbd_mirror/image_replayer/snapshot/ApplyImageStateRequest.h"
#include "tools/rbd_mirror/image_replayer/snapshot/StateBuilder.h"
#include "tools/rbd_mirror/image_replayer/snapshot/Utils.h"
+#include <set>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
namespace {
+double round_to_two_places(double value) {
+ return abs(round(value * 100) / 100);
+}
+
template<typename I>
std::pair<uint64_t, librbd::SnapInfo*> get_newest_mirror_snapshot(
I* image_ctx) {
};
template <typename I>
-struct Replayer<I>::C_TrackedOp : public Context {
+struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
Replayer *replayer;
- Context* ctx;
-
- C_TrackedOp(Replayer* replayer, Context* ctx)
- : replayer(replayer), ctx(ctx) {
- replayer->m_in_flight_op_tracker.start_op();
- }
- void finish(int r) override {
- ctx->complete(r);
- replayer->m_in_flight_op_tracker.finish_op();
+ DeepCopyHandler(Replayer* replayer) : replayer(replayer) {
}
-};
-
-template <typename I>
-struct Replayer<I>::ProgressContext : public librbd::ProgressContext {
- Replayer *replayer;
- ProgressContext(Replayer* replayer) : replayer(replayer) {
+ void handle_read(uint64_t bytes_read) override {
+ replayer->handle_copy_image_read(bytes_read);
}
int update_progress(uint64_t object_number, uint64_t object_count) override {
dout(10) << dendl;
ceph_assert(m_state == STATE_COMPLETE);
ceph_assert(m_update_watch_ctx == nullptr);
- ceph_assert(m_progress_ctx == nullptr);
+ ceph_assert(m_deep_copy_handler == nullptr);
}
template <typename I>
replay_state = "syncing";
}
- *description =
- "{"
- "\"replay_state\": \"" + replay_state + "\", " +
- "\"remote_snapshot_timestamp\": " +
- stringify(remote_snap_info->timestamp.sec());
+ json_spirit::mObject root_obj;
+ root_obj["replay_state"] = replay_state;
+ root_obj["remote_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
auto matching_remote_snap_id = util::compute_remote_snap_id(
m_state_builder->local_image_ctx->image_lock,
// use the timestamp from the matching remote image since
// the local snapshot would just be the time the snapshot was
// synced and not the consistency point in time.
- *description += ", "
- "\"local_snapshot_timestamp\": " +
- stringify(matching_remote_snap_it->second.timestamp.sec());
+ root_obj["local_snapshot_timestamp"] =
+ matching_remote_snap_it->second.timestamp.sec();
}
matching_remote_snap_it = m_state_builder->remote_image_ctx->snap_info.find(
if (m_remote_snap_id_end != CEPH_NOSNAP &&
matching_remote_snap_it !=
m_state_builder->remote_image_ctx->snap_info.end()) {
- *description += ", "
- "\"syncing_snapshot_timestamp\": " +
- stringify(remote_snap_info->timestamp.sec()) + ", " +
- "\"syncing_percent\": " + stringify(static_cast<uint32_t>(
+ root_obj["syncing_snapshot_timestamp"] = remote_snap_info->timestamp.sec();
+ root_obj["syncing_percent"] = static_cast<uint64_t>(
100 * m_local_mirror_snap_ns.last_copied_object_number /
- static_cast<float>(std::max<uint64_t>(1U, m_local_object_count))));
+ static_cast<float>(std::max<uint64_t>(1U, m_local_object_count)));
}
- *description +=
- "}";
+ m_bytes_per_second(0);
+ auto bytes_per_second = m_bytes_per_second.get_average();
+ root_obj["bytes_per_second"] = round_to_two_places(bytes_per_second);
+
+ auto bytes_per_snapshot = boost::accumulators::rolling_mean(
+ m_bytes_per_snapshot);
+ root_obj["bytes_per_snapshot"] = round_to_two_places(bytes_per_snapshot);
+
+ auto pending_bytes = bytes_per_snapshot * m_pending_snapshots;
+ if (bytes_per_second > 0 && m_pending_snapshots > 0) {
+ auto seconds_until_synced = round_to_two_places(
+ pending_bytes / bytes_per_second);
+ if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
+ seconds_until_synced = std::numeric_limits<uint64_t>::max();
+ }
+
+ root_obj["seconds_until_synced"] = static_cast<uint64_t>(
+ seconds_until_synced);
+ }
+
+ *description = json_spirit::write(
+ root_obj, json_spirit::remove_trailing_zeros);
local_image_locker.unlock();
remote_image_locker.unlock();
m_remote_snap_id_end = CEPH_NOSNAP;
m_remote_mirror_snap_ns = {};
+ std::set<uint64_t> prune_snap_ids;
+
auto local_image_ctx = m_state_builder->local_image_ctx;
std::shared_lock image_locker{local_image_ctx->image_lock};
for (auto snap_info_it = local_image_ctx->snap_info.begin();
// if remote has new snapshots, we would sync from here
m_local_snap_id_start = local_snap_id;
m_local_snap_id_end = CEPH_NOSNAP;
+
+ if (mirror_ns->mirror_peer_uuids.empty()) {
+ // no other peer will attempt to sync to this snapshot so store as
+ // a candidate for removal
+ prune_snap_ids.insert(local_snap_id);
+ }
} else {
// start snap will be last complete mirror snapshot or initial
// image revision
m_local_snap_id_end = local_snap_id;
+
+ if (mirror_ns->last_copied_object_number == 0) {
+ // snapshot might be missing image state, object-map, etc, so just
+ // delete and re-create it if we haven't started copying data
+ // objects
+ prune_snap_ids.insert(local_snap_id);
+ break;
+ }
}
} else if (mirror_ns->is_primary()) {
if (mirror_ns->complete) {
}
image_locker.unlock();
+ if (m_local_snap_id_start > 0 && m_local_snap_id_end == CEPH_NOSNAP) {
+ // remove candidate that is required for delta snapshot sync
+ prune_snap_ids.erase(m_local_snap_id_start);
+ }
+ if (!prune_snap_ids.empty()) {
+ locker->unlock();
+
+ auto prune_snap_id = *prune_snap_ids.begin();
+ dout(5) << "pruning unused non-primary snapshot " << prune_snap_id << dendl;
+ prune_non_primary_snapshot(prune_snap_id);
+ return;
+ }
+
if (m_local_snap_id_start > 0 || m_local_snap_id_end != CEPH_NOSNAP) {
if (m_local_mirror_snap_ns.is_non_primary() &&
m_local_mirror_snap_ns.primary_mirror_uuid !=
std::unique_lock<ceph::mutex>* locker) {
dout(10) << dendl;
+ m_pending_snapshots = 0;
+
+ std::set<uint64_t> unlink_snap_ids;
bool split_brain = false;
bool remote_demoted = false;
auto remote_image_ctx = m_state_builder->remote_image_ctx;
dout(15) << "remote mirror snapshot: id=" << snap_info_it->first << ", "
<< "mirror_ns=" << *mirror_ns << dendl;
+ remote_demoted = mirror_ns->is_demoted();
if (!mirror_ns->is_primary() && !mirror_ns->is_non_primary()) {
derr << "unknown remote mirror snapshot state" << dendl;
handle_replay_complete(locker, -EINVAL,
"invalid remote mirror snapshot state");
return;
- } else {
- remote_demoted = mirror_ns->is_demoted();
+ } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
+ 0) {
+ dout(15) << "skipping remote snapshot due to missing mirror peer"
+ << dendl;
+ continue;
}
auto remote_snap_id = snap_info_it->first;
ceph_assert(m_local_mirror_snap_ns.primary_mirror_uuid ==
m_state_builder->remote_mirror_uuid);
+ unlink_snap_ids.insert(remote_snap_id);
if (m_local_mirror_snap_ns.complete &&
m_local_mirror_snap_ns.primary_snap_id >= remote_snap_id) {
// skip past completed remote snapshot
m_remote_snap_id_start = remote_snap_id;
+ m_remote_mirror_snap_ns = *mirror_ns;
dout(15) << "skipping synced remote snapshot " << remote_snap_id
<< dendl;
continue;
dout(15) << "skipping synced remote snapshot " << remote_snap_id
<< " while search for in-progress sync" << dendl;
m_remote_snap_id_start = remote_snap_id;
+ m_remote_mirror_snap_ns = *mirror_ns;
continue;
}
} else if (m_local_mirror_snap_ns.state ==
// should not have been able to reach this
ceph_assert(false);
}
- }
-
- // find first snapshot where were are listed as a peer
- if (!mirror_ns->is_primary()) {
+ } else if (!mirror_ns->is_primary()) {
dout(15) << "skipping non-primary remote snapshot" << dendl;
continue;
- } else if (mirror_ns->mirror_peer_uuids.count(m_remote_mirror_peer_uuid) ==
- 0) {
- dout(15) << "skipping remote snapshot due to missing mirror peer"
- << dendl;
+ }
+
+ // found candidate snapshot to sync
+ ++m_pending_snapshots;
+ if (m_remote_snap_id_end != CEPH_NOSNAP) {
continue;
}
+ // first primary snapshot where were are listed as a peer
m_remote_snap_id_end = remote_snap_id;
m_remote_mirror_snap_ns = *mirror_ns;
- break;
}
image_locker.unlock();
+ unlink_snap_ids.erase(m_remote_snap_id_start);
+ unlink_snap_ids.erase(m_remote_snap_id_end);
+ if (!unlink_snap_ids.empty()) {
+ locker->unlock();
+
+ // retry the unlinking process for a remote snapshot that we do not
+ // need anymore
+ auto remote_snap_id = *unlink_snap_ids.begin();
+ dout(10) << "unlinking from remote snapshot " << remote_snap_id << dendl;
+ unlink_peer(remote_snap_id);
+ return;
+ }
+
if (m_remote_snap_id_end != CEPH_NOSNAP) {
dout(10) << "found remote mirror snapshot: "
<< "remote_snap_id_start=" << m_remote_snap_id_start << ", "
notify_status_updated();
}
+template <typename I>
+void Replayer<I>::prune_non_primary_snapshot(uint64_t snap_id) {
+ dout(10) << "snap_id=" << snap_id << dendl;
+
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ bool snap_valid = false;
+ cls::rbd::SnapshotNamespace snap_namespace;
+ std::string snap_name;
+
+ {
+ std::shared_lock image_locker{local_image_ctx->image_lock};
+ auto snap_info = local_image_ctx->get_snap_info(snap_id);
+ if (snap_info != nullptr) {
+ snap_valid = true;
+ snap_namespace = snap_info->snap_namespace;
+ snap_name = snap_info->name;
+
+ ceph_assert(boost::get<cls::rbd::MirrorSnapshotNamespace>(
+ &snap_namespace) != nullptr);
+ }
+ }
+
+ if (!snap_valid) {
+ load_local_image_meta();
+ return;
+ }
+
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_prune_non_primary_snapshot>(this);
+ local_image_ctx->operations->snap_remove(snap_namespace, snap_name, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_prune_non_primary_snapshot(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0 && r != -ENOENT) {
+ derr << "failed to prune non-primary snapshot: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(r, "failed to prune non-primary snapshot");
+ return;
+ }
+
+ if (is_replay_interrupted()) {
+ return;
+ }
+
+ load_local_image_meta();
+}
+
template <typename I>
void Replayer<I>::copy_snapshots() {
dout(10) << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
template <typename I>
void Replayer<I>::create_non_primary_snapshot() {
- dout(10) << dendl;
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+
+ if (m_local_snap_id_start > 0) {
+ std::shared_lock local_image_locker{local_image_ctx->image_lock};
+
+ auto local_snap_info_it = local_image_ctx->snap_info.find(
+ m_local_snap_id_start);
+ if (local_snap_info_it == local_image_ctx->snap_info.end()) {
+ local_image_locker.unlock();
+
+ derr << "failed to locate local snapshot " << m_local_snap_id_start
+ << dendl;
+ handle_replay_complete(-ENOENT, "failed to locate local start snapshot");
+ return;
+ }
+
+ auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+ &local_snap_info_it->second.snap_namespace);
+ ceph_assert(mirror_ns != nullptr);
+
+ auto remote_image_ctx = m_state_builder->remote_image_ctx;
+ std::shared_lock remote_image_locker{remote_image_ctx->image_lock};
+
+ // (re)build a full mapping from remote to local snap ids for all user
+ // snapshots to support applying image state in the future
+ for (auto& [remote_snap_id, remote_snap_info] :
+ remote_image_ctx->snap_info) {
+ if (remote_snap_id >= m_remote_snap_id_end) {
+ break;
+ }
+
+ // we can ignore all non-user snapshots since image state only includes
+ // user snapshots
+ if (boost::get<cls::rbd::UserSnapshotNamespace>(
+ &remote_snap_info.snap_namespace) == nullptr) {
+ continue;
+ }
+
+ uint64_t local_snap_id = CEPH_NOSNAP;
+ if (mirror_ns->is_demoted() && !m_remote_mirror_snap_ns.is_demoted()) {
+ // if we are creating a non-primary snapshot following a demotion,
+ // re-build the full snapshot sequence since we don't have a valid
+ // snapshot mapping
+ auto local_snap_id_it = local_image_ctx->snap_ids.find(
+ {remote_snap_info.snap_namespace, remote_snap_info.name});
+ if (local_snap_id_it != local_image_ctx->snap_ids.end()) {
+ local_snap_id = local_snap_id_it->second;
+ }
+ } else {
+ auto snap_seq_it = mirror_ns->snap_seqs.find(remote_snap_id);
+ if (snap_seq_it != mirror_ns->snap_seqs.end()) {
+ local_snap_id = snap_seq_it->second;
+ }
+ }
+
+ if (m_local_mirror_snap_ns.snap_seqs.count(remote_snap_id) == 0 &&
+ local_snap_id != CEPH_NOSNAP) {
+ dout(15) << "mapping remote snapshot " << remote_snap_id << " to "
+ << "local snapshot " << local_snap_id << dendl;
+ m_local_mirror_snap_ns.snap_seqs[remote_snap_id] = local_snap_id;
+ }
+ }
+ }
+
+ dout(10) << "demoted=" << m_remote_mirror_snap_ns.is_demoted() << ", "
+ << "primary_mirror_uuid="
+ << m_state_builder->remote_mirror_uuid << ", "
+ << "primary_snap_id=" << m_remote_snap_id_end << ", "
+ << "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
auto ctx = create_context_callback<
Replayer<I>, &Replayer<I>::handle_create_non_primary_snapshot>(this);
auto req = librbd::mirror::snapshot::CreateNonPrimaryRequest<I>::create(
- m_state_builder->local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
+ local_image_ctx, m_remote_mirror_snap_ns.is_demoted(),
m_state_builder->remote_mirror_uuid, m_remote_snap_id_end,
m_local_mirror_snap_ns.snap_seqs, m_image_state, &m_local_snap_id_end, ctx);
req->send();
template <typename I>
void Replayer<I>::request_sync() {
- dout(10) << dendl;
+ if (m_remote_mirror_snap_ns.clean_since_snap_id == m_remote_snap_id_start) {
+ dout(10) << "skipping unnecessary image copy: "
+ << "remote_snap_id_start=" << m_remote_snap_id_start << ", "
+ << "remote_mirror_snap_ns=" << m_remote_mirror_snap_ns << dendl;
+ apply_image_state();
+ return;
+ }
+ dout(10) << dendl;
std::unique_lock locker{m_lock};
+ if (is_replay_interrupted(&locker)) {
+ return;
+ }
+
auto ctx = create_async_context_callback(
m_threads->work_queue, create_context_callback<
Replayer<I>, &Replayer<I>::handle_request_sync>(this));
<< m_local_mirror_snap_ns.last_copied_object_number << ", "
<< "snap_seqs=" << m_local_mirror_snap_ns.snap_seqs << dendl;
- m_progress_ctx = new ProgressContext(this);
+ m_snapshot_bytes = 0;
+ m_deep_copy_handler = new DeepCopyHandler(this);
auto ctx = create_context_callback<
Replayer<I>, &Replayer<I>::handle_copy_image>(this);
auto req = librbd::deep_copy::ImageCopyRequest<I>::create(
librbd::deep_copy::ObjectNumber{
m_local_mirror_snap_ns.last_copied_object_number} :
librbd::deep_copy::ObjectNumber{}),
- m_local_mirror_snap_ns.snap_seqs, m_progress_ctx, ctx);
+ m_local_mirror_snap_ns.snap_seqs, m_deep_copy_handler, ctx);
req->send();
}
void Replayer<I>::handle_copy_image(int r) {
dout(10) << "r=" << r << dendl;
- delete m_progress_ctx;
- m_progress_ctx = nullptr;
+ delete m_deep_copy_handler;
+ m_deep_copy_handler = nullptr;
if (r < 0) {
derr << "failed to copy remote image to local image: " << cpp_strerror(r)
return;
}
+ {
+ std::unique_lock locker{m_lock};
+ m_bytes_per_snapshot(m_snapshot_bytes);
+ m_snapshot_bytes = 0;
+ }
+
apply_image_state();
}
update_non_primary_snapshot(false);
}
+template <typename I>
+void Replayer<I>::handle_copy_image_read(uint64_t bytes_read) {
+ dout(20) << "bytes_read=" << bytes_read << dendl;
+
+ std::unique_lock locker{m_lock};
+ m_bytes_per_second(bytes_read);
+ m_snapshot_bytes += bytes_read;
+}
+
template <typename I>
void Replayer<I>::apply_image_state() {
dout(10) << dendl;
&op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
m_local_mirror_snap_ns.last_copied_object_number);
- auto ctx = new C_TrackedOp(this, new LambdaContext([this, complete](int r) {
+ auto ctx = new C_TrackedOp(
+ m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
handle_update_non_primary_snapshot(complete, r);
}));
auto aio_comp = create_rados_callback(ctx);
derr << "failed to notify local image update: " << cpp_strerror(r) << dendl;
}
- unlink_peer();
+ unlink_peer(m_remote_snap_id_start);
}
template <typename I>
-void Replayer<I>::unlink_peer() {
- if (m_remote_snap_id_start == 0) {
+void Replayer<I>::unlink_peer(uint64_t remote_snap_id) {
+ if (remote_snap_id == 0) {
finish_sync();
return;
}
// local snapshot fully synced -- we no longer depend on the sync
// start snapshot in the remote image
- dout(10) << "remote_snap_id=" << m_remote_snap_id_start << dendl;
+ dout(10) << "remote_snap_id=" << remote_snap_id << dendl;
auto ctx = create_context_callback<
Replayer<I>, &Replayer<I>::handle_unlink_peer>(this);
auto req = librbd::mirror::snapshot::UnlinkPeerRequest<I>::create(
- m_state_builder->remote_image_ctx, m_remote_snap_id_start,
+ m_state_builder->remote_image_ctx, remote_snap_id,
m_remote_mirror_peer_uuid, ctx);
req->send();
}
std::unique_lock locker{m_lock};
notify_status_updated();
- m_sync_in_progress = false;
- m_instance_watcher->notify_sync_complete(
- m_state_builder->local_image_ctx->id);
+ if (m_sync_in_progress) {
+ m_sync_in_progress = false;
+ m_instance_watcher->notify_sync_complete(
+ m_state_builder->local_image_ctx->id);
+ }
}
if (is_replay_interrupted()) {
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
dout(10) << dendl;
- auto ctx = new C_TrackedOp(this, new LambdaContext(
+ auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
[this](int) {
m_replayer_listener->handle_notification();
}));