]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "tools/rbd_mirror/MirrorStatusUpdater.h" | |
5 | #include "include/Context.h" | |
6 | #include "include/stringify.h" | |
7 | #include "common/debug.h" | |
8 | #include "common/errno.h" | |
9 | #include "common/Timer.h" | |
9f95a23c TL |
10 | #include "librbd/ImageCtx.h" |
11 | #include "librbd/Utils.h" | |
f67539c2 | 12 | #include "librbd/asio/ContextWQ.h" |
9f95a23c TL |
13 | #include "tools/rbd_mirror/MirrorStatusWatcher.h" |
14 | #include "tools/rbd_mirror/Threads.h" | |
15 | ||
16 | #define dout_context g_ceph_context | |
17 | #define dout_subsys ceph_subsys_rbd_mirror | |
18 | #undef dout_prefix | |
19 | #define dout_prefix *_dout << "rbd::mirror::MirrorStatusUpdater " << this \ | |
20 | << " " << __func__ << ": " | |
21 | ||
22 | namespace rbd { | |
23 | namespace mirror { | |
24 | ||
25 | static const double UPDATE_INTERVAL_SECONDS = 30; | |
26 | static const uint32_t MAX_UPDATES_PER_OP = 100; | |
27 | ||
28 | using librbd::util::create_context_callback; | |
29 | using librbd::util::create_rados_callback; | |
30 | ||
31 | template <typename I> | |
32 | MirrorStatusUpdater<I>::MirrorStatusUpdater( | |
33 | librados::IoCtx& io_ctx, Threads<I> *threads, | |
34 | const std::string& local_mirror_uuid) | |
35 | : m_io_ctx(io_ctx), m_threads(threads), | |
36 | m_local_mirror_uuid(local_mirror_uuid), | |
37 | m_lock(ceph::make_mutex("rbd::mirror::MirrorStatusUpdater " + | |
38 | stringify(m_io_ctx.get_id()))) { | |
39 | dout(10) << "local_mirror_uuid=" << local_mirror_uuid << ", " | |
40 | << "pool_id=" << m_io_ctx.get_id() << dendl; | |
41 | } | |
42 | ||
43 | template <typename I> | |
44 | MirrorStatusUpdater<I>::~MirrorStatusUpdater() { | |
45 | ceph_assert(!m_initialized); | |
46 | delete m_mirror_status_watcher; | |
47 | } | |
48 | ||
49 | template <typename I> | |
50 | void MirrorStatusUpdater<I>::init(Context* on_finish) { | |
51 | dout(10) << dendl; | |
52 | ||
53 | ceph_assert(!m_initialized); | |
54 | m_initialized = true; | |
55 | ||
56 | { | |
57 | std::lock_guard timer_locker{m_threads->timer_lock}; | |
58 | schedule_timer_task(); | |
59 | } | |
60 | ||
61 | init_mirror_status_watcher(on_finish); | |
62 | } | |
63 | ||
64 | template <typename I> | |
65 | void MirrorStatusUpdater<I>::init_mirror_status_watcher(Context* on_finish) { | |
66 | dout(10) << dendl; | |
67 | ||
68 | auto ctx = new LambdaContext([this, on_finish](int r) { | |
69 | handle_init_mirror_status_watcher(r, on_finish); | |
70 | }); | |
71 | m_mirror_status_watcher = MirrorStatusWatcher<I>::create( | |
72 | m_io_ctx, m_threads->work_queue); | |
73 | m_mirror_status_watcher->init(ctx); | |
74 | } | |
75 | ||
76 | template <typename I> | |
77 | void MirrorStatusUpdater<I>::handle_init_mirror_status_watcher( | |
78 | int r, Context* on_finish) { | |
79 | dout(10) << "r=" << r << dendl; | |
80 | ||
81 | if (r < 0) { | |
82 | derr << "failed to init mirror status watcher: " << cpp_strerror(r) | |
83 | << dendl; | |
84 | ||
85 | delete m_mirror_status_watcher; | |
86 | m_mirror_status_watcher = nullptr; | |
87 | ||
88 | on_finish = new LambdaContext([r, on_finish](int) { | |
89 | on_finish->complete(r); | |
90 | }); | |
91 | shut_down(on_finish); | |
92 | return; | |
93 | } | |
94 | ||
95 | m_threads->work_queue->queue(on_finish, 0); | |
96 | } | |
97 | ||
98 | template <typename I> | |
99 | void MirrorStatusUpdater<I>::shut_down(Context* on_finish) { | |
100 | dout(10) << dendl; | |
101 | ||
102 | { | |
103 | std::lock_guard timer_locker{m_threads->timer_lock}; | |
104 | ceph_assert(m_timer_task != nullptr); | |
105 | m_threads->timer->cancel_event(m_timer_task); | |
106 | } | |
107 | ||
108 | { | |
109 | std::unique_lock locker(m_lock); | |
110 | ceph_assert(m_initialized); | |
111 | m_initialized = false; | |
112 | } | |
113 | ||
114 | shut_down_mirror_status_watcher(on_finish); | |
115 | } | |
116 | ||
117 | template <typename I> | |
118 | void MirrorStatusUpdater<I>::shut_down_mirror_status_watcher( | |
119 | Context* on_finish) { | |
120 | if (m_mirror_status_watcher == nullptr) { | |
121 | finalize_shutdown(0, on_finish); | |
122 | return; | |
123 | } | |
124 | ||
125 | dout(10) << dendl; | |
126 | ||
127 | auto ctx = new LambdaContext([this, on_finish](int r) { | |
128 | handle_shut_down_mirror_status_watcher(r, on_finish); | |
129 | }); | |
130 | m_mirror_status_watcher->shut_down(ctx); | |
131 | } | |
132 | ||
133 | template <typename I> | |
134 | void MirrorStatusUpdater<I>::handle_shut_down_mirror_status_watcher( | |
135 | int r, Context* on_finish) { | |
136 | dout(10) << "r=" << r << dendl; | |
137 | ||
138 | if (r < 0) { | |
139 | derr << "failed to shut down mirror status watcher: " << cpp_strerror(r) | |
140 | << dendl; | |
141 | } | |
142 | ||
143 | finalize_shutdown(r, on_finish); | |
144 | } | |
145 | ||
146 | template <typename I> | |
147 | void MirrorStatusUpdater<I>::finalize_shutdown(int r, Context* on_finish) { | |
148 | dout(10) << dendl; | |
149 | ||
150 | { | |
151 | std::unique_lock locker(m_lock); | |
152 | if (m_update_in_progress) { | |
153 | if (r < 0) { | |
154 | on_finish = new LambdaContext([r, on_finish](int) { | |
155 | on_finish->complete(r); | |
156 | }); | |
157 | } | |
158 | ||
159 | m_update_on_finish_ctxs.push_back(on_finish); | |
160 | return; | |
161 | } | |
162 | } | |
163 | ||
164 | m_threads->work_queue->queue(on_finish, r); | |
165 | } | |
166 | ||
167 | template <typename I> | |
168 | bool MirrorStatusUpdater<I>::exists(const std::string& global_image_id) { | |
169 | dout(15) << "global_image_id=" << global_image_id << dendl; | |
170 | ||
171 | std::unique_lock locker(m_lock); | |
172 | return (m_global_image_status.count(global_image_id) > 0); | |
173 | } | |
174 | ||
175 | template <typename I> | |
176 | void MirrorStatusUpdater<I>::set_mirror_image_status( | |
177 | const std::string& global_image_id, | |
178 | const cls::rbd::MirrorImageSiteStatus& mirror_image_site_status, | |
179 | bool immediate_update) { | |
180 | dout(15) << "global_image_id=" << global_image_id << ", " | |
181 | << "mirror_image_site_status=" << mirror_image_site_status << dendl; | |
182 | ||
183 | std::unique_lock locker(m_lock); | |
184 | ||
185 | m_global_image_status[global_image_id] = mirror_image_site_status; | |
186 | if (immediate_update) { | |
187 | m_update_global_image_ids.insert(global_image_id); | |
188 | queue_update_task(std::move(locker)); | |
189 | } | |
190 | } | |
191 | ||
a4b75251 TL |
192 | template <typename I> |
193 | void MirrorStatusUpdater<I>::remove_refresh_mirror_image_status( | |
194 | const std::string& global_image_id, | |
195 | Context* on_finish) { | |
196 | if (try_remove_mirror_image_status(global_image_id, false, false, | |
197 | on_finish)) { | |
198 | m_threads->work_queue->queue(on_finish, 0); | |
199 | } | |
200 | } | |
201 | ||
9f95a23c TL |
202 | template <typename I> |
203 | void MirrorStatusUpdater<I>::remove_mirror_image_status( | |
a4b75251 TL |
204 | const std::string& global_image_id, bool immediate_update, |
205 | Context* on_finish) { | |
206 | if (try_remove_mirror_image_status(global_image_id, true, immediate_update, | |
207 | on_finish)) { | |
9f95a23c TL |
208 | m_threads->work_queue->queue(on_finish, 0); |
209 | } | |
210 | } | |
211 | ||
212 | template <typename I> | |
213 | bool MirrorStatusUpdater<I>::try_remove_mirror_image_status( | |
a4b75251 TL |
214 | const std::string& global_image_id, bool queue_update, |
215 | bool immediate_update, Context* on_finish) { | |
216 | dout(15) << "global_image_id=" << global_image_id << ", " | |
217 | << "queue_update=" << queue_update << ", " | |
218 | << "immediate_update=" << immediate_update << dendl; | |
9f95a23c TL |
219 | |
220 | std::unique_lock locker(m_lock); | |
221 | if ((m_update_in_flight && | |
222 | m_updating_global_image_ids.count(global_image_id) > 0) || | |
223 | ((m_update_in_progress || m_update_requested) && | |
224 | m_update_global_image_ids.count(global_image_id) > 0)) { | |
225 | // if update is scheduled/in-progress, wait for it to complete | |
226 | on_finish = new LambdaContext( | |
a4b75251 TL |
227 | [this, global_image_id, queue_update, immediate_update, |
228 | on_finish](int r) { | |
229 | if (try_remove_mirror_image_status(global_image_id, queue_update, | |
230 | immediate_update, on_finish)) { | |
9f95a23c TL |
231 | on_finish->complete(0); |
232 | } | |
233 | }); | |
234 | m_update_on_finish_ctxs.push_back(on_finish); | |
235 | return false; | |
236 | } | |
237 | ||
238 | m_global_image_status.erase(global_image_id); | |
a4b75251 TL |
239 | if (queue_update) { |
240 | m_update_global_image_ids.insert(global_image_id); | |
241 | if (immediate_update) { | |
242 | queue_update_task(std::move(locker)); | |
243 | } | |
244 | } | |
245 | ||
9f95a23c TL |
246 | return true; |
247 | } | |
248 | ||
249 | template <typename I> | |
250 | void MirrorStatusUpdater<I>::schedule_timer_task() { | |
251 | dout(10) << dendl; | |
252 | ||
253 | ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); | |
254 | ceph_assert(m_timer_task == nullptr); | |
255 | m_timer_task = create_context_callback< | |
256 | MirrorStatusUpdater<I>, | |
257 | &MirrorStatusUpdater<I>::handle_timer_task>(this); | |
258 | m_threads->timer->add_event_after(UPDATE_INTERVAL_SECONDS, m_timer_task); | |
259 | } | |
260 | ||
261 | template <typename I> | |
262 | void MirrorStatusUpdater<I>::handle_timer_task(int r) { | |
263 | dout(10) << dendl; | |
264 | ||
265 | ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); | |
266 | ceph_assert(m_timer_task != nullptr); | |
267 | m_timer_task = nullptr; | |
268 | schedule_timer_task(); | |
269 | ||
270 | std::unique_lock locker(m_lock); | |
271 | for (auto& pair : m_global_image_status) { | |
272 | m_update_global_image_ids.insert(pair.first); | |
273 | } | |
274 | ||
275 | queue_update_task(std::move(locker)); | |
276 | } | |
277 | ||
278 | template <typename I> | |
279 | void MirrorStatusUpdater<I>::queue_update_task( | |
280 | std::unique_lock<ceph::mutex>&& locker) { | |
281 | if (!m_initialized) { | |
282 | return; | |
283 | } | |
284 | ||
285 | if (m_update_in_progress) { | |
286 | if (m_update_in_flight) { | |
287 | dout(10) << "deferring update due to in-flight ops" << dendl; | |
288 | m_update_requested = true; | |
289 | } | |
290 | return; | |
291 | } | |
292 | ||
293 | m_update_in_progress = true; | |
294 | ceph_assert(!m_update_in_flight); | |
295 | ceph_assert(!m_update_requested); | |
296 | locker.unlock(); | |
297 | ||
298 | dout(10) << dendl; | |
299 | auto ctx = create_context_callback< | |
300 | MirrorStatusUpdater<I>, | |
301 | &MirrorStatusUpdater<I>::update_task>(this); | |
302 | m_threads->work_queue->queue(ctx); | |
303 | } | |
304 | ||
305 | template <typename I> | |
306 | void MirrorStatusUpdater<I>::update_task(int r) { | |
307 | dout(10) << dendl; | |
308 | ||
309 | std::unique_lock locker(m_lock); | |
310 | ceph_assert(m_update_in_progress); | |
311 | ceph_assert(!m_update_in_flight); | |
312 | m_update_in_flight = true; | |
313 | ||
314 | std::swap(m_updating_global_image_ids, m_update_global_image_ids); | |
315 | auto updating_global_image_ids = m_updating_global_image_ids; | |
316 | auto global_image_status = m_global_image_status; | |
317 | locker.unlock(); | |
318 | ||
319 | Context* ctx = create_context_callback< | |
320 | MirrorStatusUpdater<I>, | |
321 | &MirrorStatusUpdater<I>::handle_update_task>(this); | |
322 | if (updating_global_image_ids.empty()) { | |
323 | ctx->complete(0); | |
324 | return; | |
325 | } | |
326 | ||
327 | auto gather = new C_Gather(g_ceph_context, ctx); | |
328 | ||
329 | auto it = updating_global_image_ids.begin(); | |
330 | while (it != updating_global_image_ids.end()) { | |
331 | librados::ObjectWriteOperation op; | |
332 | uint32_t op_count = 0; | |
333 | ||
334 | while (it != updating_global_image_ids.end() && | |
335 | op_count < MAX_UPDATES_PER_OP) { | |
336 | auto& global_image_id = *it; | |
337 | ++it; | |
338 | ||
339 | auto status_it = global_image_status.find(global_image_id); | |
340 | if (status_it == global_image_status.end()) { | |
a4b75251 TL |
341 | librbd::cls_client::mirror_image_status_remove(&op, global_image_id); |
342 | ++op_count; | |
9f95a23c TL |
343 | continue; |
344 | } | |
345 | ||
346 | status_it->second.mirror_uuid = m_local_mirror_uuid; | |
347 | librbd::cls_client::mirror_image_status_set(&op, global_image_id, | |
348 | status_it->second); | |
349 | ++op_count; | |
350 | } | |
351 | ||
352 | auto aio_comp = create_rados_callback(gather->new_sub()); | |
353 | int r = m_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op); | |
354 | ceph_assert(r == 0); | |
355 | aio_comp->release(); | |
356 | } | |
357 | ||
358 | gather->activate(); | |
359 | } | |
360 | ||
361 | template <typename I> | |
362 | void MirrorStatusUpdater<I>::handle_update_task(int r) { | |
363 | dout(10) << dendl; | |
364 | if (r < 0) { | |
365 | derr << "failed to update mirror image statuses: " << cpp_strerror(r) | |
366 | << dendl; | |
367 | } | |
368 | ||
369 | std::unique_lock locker(m_lock); | |
370 | ||
371 | Contexts on_finish_ctxs; | |
372 | std::swap(on_finish_ctxs, m_update_on_finish_ctxs); | |
373 | ||
374 | ceph_assert(m_update_in_progress); | |
375 | m_update_in_progress = false; | |
376 | ||
377 | ceph_assert(m_update_in_flight); | |
378 | m_update_in_flight = false; | |
379 | ||
380 | m_updating_global_image_ids.clear(); | |
381 | ||
382 | if (m_update_requested) { | |
383 | m_update_requested = false; | |
384 | queue_update_task(std::move(locker)); | |
385 | } else { | |
386 | locker.unlock(); | |
387 | } | |
388 | ||
389 | for (auto on_finish : on_finish_ctxs) { | |
390 | on_finish->complete(0); | |
391 | } | |
392 | } | |
393 | ||
394 | } // namespace mirror | |
395 | } // namespace rbd | |
396 | ||
397 | template class rbd::mirror::MirrorStatusUpdater<librbd::ImageCtx>; |