]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/MirrorStatusUpdater.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / MirrorStatusUpdater.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "tools/rbd_mirror/MirrorStatusUpdater.h"
5#include "include/Context.h"
6#include "include/stringify.h"
7#include "common/debug.h"
8#include "common/errno.h"
9#include "common/Timer.h"
9f95a23c
TL
10#include "librbd/ImageCtx.h"
11#include "librbd/Utils.h"
f67539c2 12#include "librbd/asio/ContextWQ.h"
9f95a23c
TL
13#include "tools/rbd_mirror/MirrorStatusWatcher.h"
14#include "tools/rbd_mirror/Threads.h"
15
16#define dout_context g_ceph_context
17#define dout_subsys ceph_subsys_rbd_mirror
18#undef dout_prefix
19#define dout_prefix *_dout << "rbd::mirror::MirrorStatusUpdater " << this \
20 << " " << __func__ << ": "
21
22namespace rbd {
23namespace mirror {
24
25static const double UPDATE_INTERVAL_SECONDS = 30;
26static const uint32_t MAX_UPDATES_PER_OP = 100;
27
28using librbd::util::create_context_callback;
29using librbd::util::create_rados_callback;
30
31template <typename I>
32MirrorStatusUpdater<I>::MirrorStatusUpdater(
33 librados::IoCtx& io_ctx, Threads<I> *threads,
34 const std::string& local_mirror_uuid)
35 : m_io_ctx(io_ctx), m_threads(threads),
36 m_local_mirror_uuid(local_mirror_uuid),
37 m_lock(ceph::make_mutex("rbd::mirror::MirrorStatusUpdater " +
38 stringify(m_io_ctx.get_id()))) {
39 dout(10) << "local_mirror_uuid=" << local_mirror_uuid << ", "
40 << "pool_id=" << m_io_ctx.get_id() << dendl;
41}
42
43template <typename I>
44MirrorStatusUpdater<I>::~MirrorStatusUpdater() {
45 ceph_assert(!m_initialized);
46 delete m_mirror_status_watcher;
47}
48
49template <typename I>
50void MirrorStatusUpdater<I>::init(Context* on_finish) {
51 dout(10) << dendl;
52
53 ceph_assert(!m_initialized);
54 m_initialized = true;
55
56 {
57 std::lock_guard timer_locker{m_threads->timer_lock};
58 schedule_timer_task();
59 }
60
61 init_mirror_status_watcher(on_finish);
62}
63
64template <typename I>
65void MirrorStatusUpdater<I>::init_mirror_status_watcher(Context* on_finish) {
66 dout(10) << dendl;
67
68 auto ctx = new LambdaContext([this, on_finish](int r) {
69 handle_init_mirror_status_watcher(r, on_finish);
70 });
71 m_mirror_status_watcher = MirrorStatusWatcher<I>::create(
72 m_io_ctx, m_threads->work_queue);
73 m_mirror_status_watcher->init(ctx);
74}
75
76template <typename I>
77void MirrorStatusUpdater<I>::handle_init_mirror_status_watcher(
78 int r, Context* on_finish) {
79 dout(10) << "r=" << r << dendl;
80
81 if (r < 0) {
82 derr << "failed to init mirror status watcher: " << cpp_strerror(r)
83 << dendl;
84
85 delete m_mirror_status_watcher;
86 m_mirror_status_watcher = nullptr;
87
88 on_finish = new LambdaContext([r, on_finish](int) {
89 on_finish->complete(r);
90 });
91 shut_down(on_finish);
92 return;
93 }
94
95 m_threads->work_queue->queue(on_finish, 0);
96}
97
98template <typename I>
99void MirrorStatusUpdater<I>::shut_down(Context* on_finish) {
100 dout(10) << dendl;
101
102 {
103 std::lock_guard timer_locker{m_threads->timer_lock};
104 ceph_assert(m_timer_task != nullptr);
105 m_threads->timer->cancel_event(m_timer_task);
106 }
107
108 {
109 std::unique_lock locker(m_lock);
110 ceph_assert(m_initialized);
111 m_initialized = false;
112 }
113
114 shut_down_mirror_status_watcher(on_finish);
115}
116
117template <typename I>
118void MirrorStatusUpdater<I>::shut_down_mirror_status_watcher(
119 Context* on_finish) {
120 if (m_mirror_status_watcher == nullptr) {
121 finalize_shutdown(0, on_finish);
122 return;
123 }
124
125 dout(10) << dendl;
126
127 auto ctx = new LambdaContext([this, on_finish](int r) {
128 handle_shut_down_mirror_status_watcher(r, on_finish);
129 });
130 m_mirror_status_watcher->shut_down(ctx);
131}
132
133template <typename I>
134void MirrorStatusUpdater<I>::handle_shut_down_mirror_status_watcher(
135 int r, Context* on_finish) {
136 dout(10) << "r=" << r << dendl;
137
138 if (r < 0) {
139 derr << "failed to shut down mirror status watcher: " << cpp_strerror(r)
140 << dendl;
141 }
142
143 finalize_shutdown(r, on_finish);
144}
145
146template <typename I>
147void MirrorStatusUpdater<I>::finalize_shutdown(int r, Context* on_finish) {
148 dout(10) << dendl;
149
150 {
151 std::unique_lock locker(m_lock);
152 if (m_update_in_progress) {
153 if (r < 0) {
154 on_finish = new LambdaContext([r, on_finish](int) {
155 on_finish->complete(r);
156 });
157 }
158
159 m_update_on_finish_ctxs.push_back(on_finish);
160 return;
161 }
162 }
163
164 m_threads->work_queue->queue(on_finish, r);
165}
166
167template <typename I>
168bool MirrorStatusUpdater<I>::exists(const std::string& global_image_id) {
169 dout(15) << "global_image_id=" << global_image_id << dendl;
170
171 std::unique_lock locker(m_lock);
172 return (m_global_image_status.count(global_image_id) > 0);
173}
174
175template <typename I>
176void MirrorStatusUpdater<I>::set_mirror_image_status(
177 const std::string& global_image_id,
178 const cls::rbd::MirrorImageSiteStatus& mirror_image_site_status,
179 bool immediate_update) {
180 dout(15) << "global_image_id=" << global_image_id << ", "
181 << "mirror_image_site_status=" << mirror_image_site_status << dendl;
182
183 std::unique_lock locker(m_lock);
184
185 m_global_image_status[global_image_id] = mirror_image_site_status;
186 if (immediate_update) {
187 m_update_global_image_ids.insert(global_image_id);
188 queue_update_task(std::move(locker));
189 }
190}
191
a4b75251
TL
192template <typename I>
193void MirrorStatusUpdater<I>::remove_refresh_mirror_image_status(
194 const std::string& global_image_id,
195 Context* on_finish) {
196 if (try_remove_mirror_image_status(global_image_id, false, false,
197 on_finish)) {
198 m_threads->work_queue->queue(on_finish, 0);
199 }
200}
201
9f95a23c
TL
202template <typename I>
203void MirrorStatusUpdater<I>::remove_mirror_image_status(
a4b75251
TL
204 const std::string& global_image_id, bool immediate_update,
205 Context* on_finish) {
206 if (try_remove_mirror_image_status(global_image_id, true, immediate_update,
207 on_finish)) {
9f95a23c
TL
208 m_threads->work_queue->queue(on_finish, 0);
209 }
210}
211
212template <typename I>
213bool MirrorStatusUpdater<I>::try_remove_mirror_image_status(
a4b75251
TL
214 const std::string& global_image_id, bool queue_update,
215 bool immediate_update, Context* on_finish) {
216 dout(15) << "global_image_id=" << global_image_id << ", "
217 << "queue_update=" << queue_update << ", "
218 << "immediate_update=" << immediate_update << dendl;
9f95a23c
TL
219
220 std::unique_lock locker(m_lock);
221 if ((m_update_in_flight &&
222 m_updating_global_image_ids.count(global_image_id) > 0) ||
223 ((m_update_in_progress || m_update_requested) &&
224 m_update_global_image_ids.count(global_image_id) > 0)) {
225 // if update is scheduled/in-progress, wait for it to complete
226 on_finish = new LambdaContext(
a4b75251
TL
227 [this, global_image_id, queue_update, immediate_update,
228 on_finish](int r) {
229 if (try_remove_mirror_image_status(global_image_id, queue_update,
230 immediate_update, on_finish)) {
9f95a23c
TL
231 on_finish->complete(0);
232 }
233 });
234 m_update_on_finish_ctxs.push_back(on_finish);
235 return false;
236 }
237
238 m_global_image_status.erase(global_image_id);
a4b75251
TL
239 if (queue_update) {
240 m_update_global_image_ids.insert(global_image_id);
241 if (immediate_update) {
242 queue_update_task(std::move(locker));
243 }
244 }
245
9f95a23c
TL
246 return true;
247}
248
249template <typename I>
250void MirrorStatusUpdater<I>::schedule_timer_task() {
251 dout(10) << dendl;
252
253 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
254 ceph_assert(m_timer_task == nullptr);
255 m_timer_task = create_context_callback<
256 MirrorStatusUpdater<I>,
257 &MirrorStatusUpdater<I>::handle_timer_task>(this);
258 m_threads->timer->add_event_after(UPDATE_INTERVAL_SECONDS, m_timer_task);
259}
260
261template <typename I>
262void MirrorStatusUpdater<I>::handle_timer_task(int r) {
263 dout(10) << dendl;
264
265 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
266 ceph_assert(m_timer_task != nullptr);
267 m_timer_task = nullptr;
268 schedule_timer_task();
269
270 std::unique_lock locker(m_lock);
271 for (auto& pair : m_global_image_status) {
272 m_update_global_image_ids.insert(pair.first);
273 }
274
275 queue_update_task(std::move(locker));
276}
277
278template <typename I>
279void MirrorStatusUpdater<I>::queue_update_task(
280 std::unique_lock<ceph::mutex>&& locker) {
281 if (!m_initialized) {
282 return;
283 }
284
285 if (m_update_in_progress) {
286 if (m_update_in_flight) {
287 dout(10) << "deferring update due to in-flight ops" << dendl;
288 m_update_requested = true;
289 }
290 return;
291 }
292
293 m_update_in_progress = true;
294 ceph_assert(!m_update_in_flight);
295 ceph_assert(!m_update_requested);
296 locker.unlock();
297
298 dout(10) << dendl;
299 auto ctx = create_context_callback<
300 MirrorStatusUpdater<I>,
301 &MirrorStatusUpdater<I>::update_task>(this);
302 m_threads->work_queue->queue(ctx);
303}
304
305template <typename I>
306void MirrorStatusUpdater<I>::update_task(int r) {
307 dout(10) << dendl;
308
309 std::unique_lock locker(m_lock);
310 ceph_assert(m_update_in_progress);
311 ceph_assert(!m_update_in_flight);
312 m_update_in_flight = true;
313
314 std::swap(m_updating_global_image_ids, m_update_global_image_ids);
315 auto updating_global_image_ids = m_updating_global_image_ids;
316 auto global_image_status = m_global_image_status;
317 locker.unlock();
318
319 Context* ctx = create_context_callback<
320 MirrorStatusUpdater<I>,
321 &MirrorStatusUpdater<I>::handle_update_task>(this);
322 if (updating_global_image_ids.empty()) {
323 ctx->complete(0);
324 return;
325 }
326
327 auto gather = new C_Gather(g_ceph_context, ctx);
328
329 auto it = updating_global_image_ids.begin();
330 while (it != updating_global_image_ids.end()) {
331 librados::ObjectWriteOperation op;
332 uint32_t op_count = 0;
333
334 while (it != updating_global_image_ids.end() &&
335 op_count < MAX_UPDATES_PER_OP) {
336 auto& global_image_id = *it;
337 ++it;
338
339 auto status_it = global_image_status.find(global_image_id);
340 if (status_it == global_image_status.end()) {
a4b75251
TL
341 librbd::cls_client::mirror_image_status_remove(&op, global_image_id);
342 ++op_count;
9f95a23c
TL
343 continue;
344 }
345
346 status_it->second.mirror_uuid = m_local_mirror_uuid;
347 librbd::cls_client::mirror_image_status_set(&op, global_image_id,
348 status_it->second);
349 ++op_count;
350 }
351
352 auto aio_comp = create_rados_callback(gather->new_sub());
353 int r = m_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
354 ceph_assert(r == 0);
355 aio_comp->release();
356 }
357
358 gather->activate();
359}
360
361template <typename I>
362void MirrorStatusUpdater<I>::handle_update_task(int r) {
363 dout(10) << dendl;
364 if (r < 0) {
365 derr << "failed to update mirror image statuses: " << cpp_strerror(r)
366 << dendl;
367 }
368
369 std::unique_lock locker(m_lock);
370
371 Contexts on_finish_ctxs;
372 std::swap(on_finish_ctxs, m_update_on_finish_ctxs);
373
374 ceph_assert(m_update_in_progress);
375 m_update_in_progress = false;
376
377 ceph_assert(m_update_in_flight);
378 m_update_in_flight = false;
379
380 m_updating_global_image_ids.clear();
381
382 if (m_update_requested) {
383 m_update_requested = false;
384 queue_update_task(std::move(locker));
385 } else {
386 locker.unlock();
387 }
388
389 for (auto on_finish : on_finish_ctxs) {
390 on_finish->complete(0);
391 }
392}
393
394} // namespace mirror
395} // namespace rbd
396
397template class rbd::mirror::MirrorStatusUpdater<librbd::ImageCtx>;