]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / tools / rbd_mirror / ImageSyncThrottler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 SUSE LINUX GmbH
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "ImageSyncThrottler.h"
16 #include "ImageSync.h"
17 #include "common/ceph_context.h"
18
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rbd_mirror
21 #undef dout_prefix
22 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
23 << " " << __func__ << ": "
24 using std::unique_ptr;
25 using std::string;
26 using std::set;
27
28 namespace rbd {
29 namespace mirror {
30
31 template <typename ImageCtxT>
32 struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
33 ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
34 PoolImageId m_local_pool_image_id;
35 ImageSync<ImageCtxT> *m_sync = nullptr;
36 Context *m_on_finish;
37
38 C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
39 const PoolImageId &local_pool_image_id, Context *on_finish)
40 : m_sync_throttler(sync_throttler),
41 m_local_pool_image_id(local_pool_image_id), m_on_finish(on_finish) {
42 }
43
44 void finish(int r) override {
45 m_sync_throttler->handle_sync_finished(this);
46 m_on_finish->complete(r);
47 }
48 };
49
50 template <typename I>
51 ImageSyncThrottler<I>::ImageSyncThrottler()
52 : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
53 m_lock("rbd::mirror::ImageSyncThrottler")
54 {
55 dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
56 << dendl;
57 g_ceph_context->_conf->add_observer(this);
58 }
59
60 template <typename I>
61 ImageSyncThrottler<I>::~ImageSyncThrottler() {
62 {
63 Mutex::Locker l(m_lock);
64 assert(m_sync_queue.empty());
65 assert(m_inflight_syncs.empty());
66 }
67
68 g_ceph_context->_conf->remove_observer(this);
69 }
70
71 template <typename I>
72 void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
73 SafeTimer *timer, Mutex *timer_lock,
74 const std::string &mirror_uuid,
75 Journaler *journaler,
76 MirrorPeerClientMeta *client_meta,
77 ContextWQ *work_queue,
78 Context *on_finish,
79 ProgressContext *progress_ctx) {
80 dout(20) << dendl;
81
82 PoolImageId pool_image_id(local_image_ctx->md_ctx.get_id(),
83 local_image_ctx->id);
84 C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, pool_image_id,
85 on_finish);
86 sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
87 remote_image_ctx, timer,
88 timer_lock, mirror_uuid,
89 journaler, client_meta,
90 work_queue, sync_holder_ctx,
91 progress_ctx);
92 sync_holder_ctx->m_sync->get();
93
94 bool start = false;
95 {
96 Mutex::Locker l(m_lock);
97
98 if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
99 assert(m_inflight_syncs.count(pool_image_id) == 0);
100 m_inflight_syncs[pool_image_id] = sync_holder_ctx;
101 start = true;
102 dout(10) << "ready to start image sync for local_image_id "
103 << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
104 << m_max_concurrent_syncs << "]" << dendl;
105 } else {
106 m_sync_queue.push_front(sync_holder_ctx);
107 dout(10) << "image sync for local_image_id " << local_image_ctx->id
108 << " has been queued" << dendl;
109 }
110 }
111
112 if (start) {
113 sync_holder_ctx->m_sync->send();
114 }
115 }
116
117 template <typename I>
118 void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
119 const std::string local_image_id) {
120 dout(20) << dendl;
121
122 C_SyncHolder *sync_holder = nullptr;
123 bool running_sync = true;
124
125 {
126 Mutex::Locker l(m_lock);
127 if (m_inflight_syncs.empty()) {
128 // no image sync currently running and neither waiting
129 return;
130 }
131
132 PoolImageId local_pool_image_id(local_io_ctx.get_id(),
133 local_image_id);
134 auto it = m_inflight_syncs.find(local_pool_image_id);
135 if (it != m_inflight_syncs.end()) {
136 sync_holder = it->second;
137 }
138
139 if (!sync_holder) {
140 for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
141 if ((*it)->m_local_pool_image_id == local_pool_image_id) {
142 sync_holder = (*it);
143 m_sync_queue.erase(it);
144 running_sync = false;
145 break;
146 }
147 }
148 }
149 }
150
151 if (sync_holder) {
152 if (running_sync) {
153 dout(10) << "canceled running image sync for local_image_id "
154 << sync_holder->m_local_pool_image_id.second << dendl;
155 sync_holder->m_sync->cancel();
156 } else {
157 dout(10) << "canceled waiting image sync for local_image_id "
158 << sync_holder->m_local_pool_image_id.second << dendl;
159 sync_holder->m_on_finish->complete(-ECANCELED);
160 sync_holder->m_sync->put();
161 delete sync_holder;
162 }
163 }
164 }
165
166 template <typename I>
167 void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
168 dout(20) << dendl;
169
170 C_SyncHolder *next_sync_holder = nullptr;
171
172 {
173 Mutex::Locker l(m_lock);
174 m_inflight_syncs.erase(sync_holder->m_local_pool_image_id);
175
176 if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
177 !m_sync_queue.empty()) {
178 next_sync_holder = m_sync_queue.back();
179 m_sync_queue.pop_back();
180
181 assert(
182 m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
183 m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
184 next_sync_holder;
185 dout(10) << "ready to start image sync for local_image_id "
186 << next_sync_holder->m_local_pool_image_id.second
187 << " [" << m_inflight_syncs.size() << "/"
188 << m_max_concurrent_syncs << "]" << dendl;
189 }
190
191 dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
192 << "/" << m_max_concurrent_syncs << "]" << dendl;
193 }
194
195 if (next_sync_holder) {
196 next_sync_holder->m_sync->send();
197 }
198 }
199
200 template <typename I>
201 void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
202 dout(20) << " max=" << max << dendl;
203
204 assert(max > 0);
205
206 std::list<C_SyncHolder *> next_sync_holders;
207 {
208 Mutex::Locker l(m_lock);
209 this->m_max_concurrent_syncs = max;
210
211 // Start waiting syncs in the case of available free slots
212 while(m_inflight_syncs.size() < m_max_concurrent_syncs
213 && !m_sync_queue.empty()) {
214 C_SyncHolder *next_sync_holder = m_sync_queue.back();
215 next_sync_holders.push_back(next_sync_holder);
216 m_sync_queue.pop_back();
217
218 assert(
219 m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
220 m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
221 next_sync_holder;
222
223 dout(10) << "ready to start image sync for local_image_id "
224 << next_sync_holder->m_local_pool_image_id.second
225 << " [" << m_inflight_syncs.size() << "/"
226 << m_max_concurrent_syncs << "]" << dendl;
227 }
228 }
229
230 for (const auto& sync_holder : next_sync_holders) {
231 sync_holder->m_sync->send();
232 }
233 }
234
235 template <typename I>
236 void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
237 Mutex::Locker l(m_lock);
238
239 if (f) {
240 f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
241 f->dump_int("running_syncs", m_inflight_syncs.size());
242 f->dump_int("waiting_syncs", m_sync_queue.size());
243 f->flush(*ss);
244 } else {
245 *ss << "[ ";
246 *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
247 *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
248 *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
249 }
250 }
251
252 template <typename I>
253 const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
254 static const char* KEYS[] = {
255 "rbd_mirror_concurrent_image_syncs",
256 NULL
257 };
258 return KEYS;
259 }
260
261 template <typename I>
262 void ImageSyncThrottler<I>::handle_conf_change(
263 const struct md_config_t *conf,
264 const set<string> &changed) {
265 if (changed.count("rbd_mirror_concurrent_image_syncs")) {
266 set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
267 }
268 }
269
270 } // namespace mirror
271 } // namespace rbd
272
273 template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;