]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / journal / ReplayStatusFormatter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ReplayStatusFormatter.h"
5#include "common/debug.h"
6#include "common/dout.h"
7#include "common/errno.h"
8#include "journal/Journaler.h"
9#include "librbd/ImageCtx.h"
10#include "librbd/Journal.h"
11#include "librbd/Utils.h"
12
13#define dout_context g_ceph_context
14#define dout_subsys ceph_subsys_rbd_mirror
15#undef dout_prefix
9f95a23c
TL
16#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
17 << "ReplayStatusFormatter: " << this << " " \
18 << __func__ << ": "
7c673cae
FG
19
20namespace rbd {
21namespace mirror {
22namespace image_replayer {
9f95a23c 23namespace journal {
7c673cae
FG
24
25using librbd::util::unique_lock_name;
26
27template <typename I>
28ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
29 const std::string &mirror_uuid)
30 : m_journaler(journaler),
31 m_mirror_uuid(mirror_uuid),
9f95a23c 32 m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
7c673cae
FG
33}
34
35template <typename I>
36bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
37 Context *on_finish) {
38 dout(20) << dendl;
39
40 bool in_progress = false;
41 {
9f95a23c 42 std::lock_guard locker{m_lock};
7c673cae
FG
43 if (m_on_finish) {
44 in_progress = true;
45 } else {
46 m_on_finish = on_finish;
47 }
48 }
49
50 if (in_progress) {
51 dout(10) << "previous request is still in progress, ignoring" << dendl;
52 on_finish->complete(-EAGAIN);
53 return false;
54 }
55
56 m_master_position = cls::journal::ObjectPosition();
57 m_mirror_position = cls::journal::ObjectPosition();
58
59 cls::journal::Client master_client, mirror_client;
60 int r;
61
62 r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
63 &master_client);
64 if (r < 0) {
65 derr << "error retrieving registered master client: "
66 << cpp_strerror(r) << dendl;
67 } else {
68 r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
69 if (r < 0) {
70 derr << "error retrieving registered mirror client: "
71 << cpp_strerror(r) << dendl;
72 }
73 }
74
75 if (!master_client.commit_position.object_positions.empty()) {
76 m_master_position =
77 *(master_client.commit_position.object_positions.begin());
78 }
79
80 if (!mirror_client.commit_position.object_positions.empty()) {
81 m_mirror_position =
82 *(mirror_client.commit_position.object_positions.begin());
83 }
84
85 if (!calculate_behind_master_or_send_update()) {
86 dout(20) << "need to update tag cache" << dendl;
87 return false;
88 }
89
90 format(description);
91
92 {
9f95a23c 93 std::lock_guard locker{m_lock};
11fdf7f2 94 ceph_assert(m_on_finish == on_finish);
7c673cae
FG
95 m_on_finish = nullptr;
96 }
97
98 on_finish->complete(-EEXIST);
99 return true;
100}
101
102template <typename I>
103bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
104 dout(20) << "m_master_position=" << m_master_position
105 << ", m_mirror_position=" << m_mirror_position << dendl;
106
107 m_entries_behind_master = 0;
108
109 if (m_master_position == cls::journal::ObjectPosition() ||
110 m_master_position.tag_tid < m_mirror_position.tag_tid) {
111 return true;
112 }
113
114 cls::journal::ObjectPosition master = m_master_position;
115 uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
116
94b18763 117 while (master.tag_tid > mirror_tag_tid) {
7c673cae
FG
118 auto tag_it = m_tag_cache.find(master.tag_tid);
119 if (tag_it == m_tag_cache.end()) {
120 send_update_tag_cache(master.tag_tid, mirror_tag_tid);
121 return false;
122 }
123 librbd::journal::TagData &tag_data = tag_it->second;
124 m_entries_behind_master += master.entry_tid;
94b18763
FG
125 master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid};
126 }
127 if (master.tag_tid == mirror_tag_tid &&
128 master.entry_tid > m_mirror_position.entry_tid) {
129 m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
7c673cae 130 }
7c673cae
FG
131
132 dout(20) << "clearing tags not needed any more (below mirror position)"
133 << dendl;
134
135 uint64_t tag_tid = mirror_tag_tid;
136 size_t old_size = m_tag_cache.size();
137 while (tag_tid != 0) {
138 auto tag_it = m_tag_cache.find(tag_tid);
139 if (tag_it == m_tag_cache.end()) {
140 break;
141 }
142 librbd::journal::TagData &tag_data = tag_it->second;
143
144 dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid
145 << dendl;
146
147 tag_tid = tag_data.predecessor.tag_tid;
148 m_tag_cache.erase(tag_it);
149 }
150
151 dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
152
153 return true;
154}
155
156template <typename I>
157void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
158 uint64_t mirror_tag_tid) {
94b18763
FG
159 if (master_tag_tid <= mirror_tag_tid ||
160 m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) {
7c673cae
FG
161 Context *on_finish = nullptr;
162 {
9f95a23c 163 std::lock_guard locker{m_lock};
7c673cae
FG
164 std::swap(m_on_finish, on_finish);
165 }
166
11fdf7f2 167 ceph_assert(on_finish);
7c673cae
FG
168 on_finish->complete(0);
169 return;
170 }
171
94b18763
FG
172 dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
173 << mirror_tag_tid << dendl;
174
9f95a23c 175 auto ctx = new LambdaContext(
7c673cae
FG
176 [this, master_tag_tid, mirror_tag_tid](int r) {
177 handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
178 });
179 m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
180}
181
182template <typename I>
183void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
184 uint64_t mirror_tag_tid,
185 int r) {
186 librbd::journal::TagData tag_data;
187
188 if (r < 0) {
189 derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
190 << dendl;
191 } else {
192 dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
193
11fdf7f2 194 auto it = m_tag.data.cbegin();
7c673cae 195 try {
11fdf7f2 196 decode(tag_data, it);
7c673cae
FG
197 } catch (const buffer::error &err) {
198 derr << "error decoding tag " << master_tag_tid << ": " << err.what()
199 << dendl;
200 }
201 }
202
203 if (tag_data.predecessor.mirror_uuid !=
204 librbd::Journal<>::LOCAL_MIRROR_UUID &&
205 tag_data.predecessor.mirror_uuid !=
206 librbd::Journal<>::ORPHAN_MIRROR_UUID) {
207 dout(20) << "hit remote image non-primary epoch" << dendl;
94b18763 208 tag_data.predecessor = {};
7c673cae
FG
209 }
210
211 dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
212
94b18763 213 m_tag_cache[master_tag_tid] = tag_data;
7c673cae
FG
214 send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid);
215}
216
217template <typename I>
218void ReplayStatusFormatter<I>::format(std::string *description) {
219
220 dout(20) << "m_master_position=" << m_master_position
221 << ", m_mirror_position=" << m_mirror_position
222 << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
223
224 std::stringstream ss;
225 ss << "master_position=";
226 if (m_master_position == cls::journal::ObjectPosition()) {
227 ss << "[]";
228 } else {
229 ss << m_master_position;
230 }
231 ss << ", mirror_position=";
232 if (m_mirror_position == cls::journal::ObjectPosition()) {
233 ss << "[]";
234 } else {
235 ss << m_mirror_position;
236 }
237 ss << ", entries_behind_master="
238 << (m_entries_behind_master > 0 ? m_entries_behind_master : 0);
239
240 *description = ss.str();
241}
242
9f95a23c 243} // namespace journal
7c673cae
FG
244} // namespace image_replayer
245} // namespace mirror
246} // namespace rbd
247
9f95a23c 248template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;