]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 SUSE LINUX GmbH | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "ImageSyncThrottler.h" | |
16 | #include "ImageSync.h" | |
17 | #include "common/ceph_context.h" | |
18 | ||
19 | #define dout_context g_ceph_context | |
20 | #define dout_subsys ceph_subsys_rbd_mirror | |
21 | #undef dout_prefix | |
22 | #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ | |
23 | << " " << __func__ << ": " | |
24 | using std::unique_ptr; | |
25 | using std::string; | |
26 | using std::set; | |
27 | ||
28 | namespace rbd { | |
29 | namespace mirror { | |
30 | ||
31 | template <typename ImageCtxT> | |
32 | struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context { | |
33 | ImageSyncThrottler<ImageCtxT> *m_sync_throttler; | |
34 | PoolImageId m_local_pool_image_id; | |
35 | ImageSync<ImageCtxT> *m_sync = nullptr; | |
36 | Context *m_on_finish; | |
37 | ||
38 | C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler, | |
39 | const PoolImageId &local_pool_image_id, Context *on_finish) | |
40 | : m_sync_throttler(sync_throttler), | |
41 | m_local_pool_image_id(local_pool_image_id), m_on_finish(on_finish) { | |
42 | } | |
43 | ||
44 | void finish(int r) override { | |
45 | m_sync_throttler->handle_sync_finished(this); | |
46 | m_on_finish->complete(r); | |
47 | } | |
48 | }; | |
49 | ||
50 | template <typename I> | |
51 | ImageSyncThrottler<I>::ImageSyncThrottler() | |
52 | : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs), | |
53 | m_lock("rbd::mirror::ImageSyncThrottler") | |
54 | { | |
55 | dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs | |
56 | << dendl; | |
57 | g_ceph_context->_conf->add_observer(this); | |
58 | } | |
59 | ||
60 | template <typename I> | |
61 | ImageSyncThrottler<I>::~ImageSyncThrottler() { | |
62 | { | |
63 | Mutex::Locker l(m_lock); | |
64 | assert(m_sync_queue.empty()); | |
65 | assert(m_inflight_syncs.empty()); | |
66 | } | |
67 | ||
68 | g_ceph_context->_conf->remove_observer(this); | |
69 | } | |
70 | ||
71 | template <typename I> | |
72 | void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx, | |
73 | SafeTimer *timer, Mutex *timer_lock, | |
74 | const std::string &mirror_uuid, | |
75 | Journaler *journaler, | |
76 | MirrorPeerClientMeta *client_meta, | |
77 | ContextWQ *work_queue, | |
78 | Context *on_finish, | |
79 | ProgressContext *progress_ctx) { | |
80 | dout(20) << dendl; | |
81 | ||
82 | PoolImageId pool_image_id(local_image_ctx->md_ctx.get_id(), | |
83 | local_image_ctx->id); | |
84 | C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, pool_image_id, | |
85 | on_finish); | |
86 | sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx, | |
87 | remote_image_ctx, timer, | |
88 | timer_lock, mirror_uuid, | |
89 | journaler, client_meta, | |
90 | work_queue, sync_holder_ctx, | |
91 | progress_ctx); | |
92 | sync_holder_ctx->m_sync->get(); | |
93 | ||
94 | bool start = false; | |
95 | { | |
96 | Mutex::Locker l(m_lock); | |
97 | ||
98 | if (m_inflight_syncs.size() < m_max_concurrent_syncs) { | |
99 | assert(m_inflight_syncs.count(pool_image_id) == 0); | |
100 | m_inflight_syncs[pool_image_id] = sync_holder_ctx; | |
101 | start = true; | |
102 | dout(10) << "ready to start image sync for local_image_id " | |
103 | << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/" | |
104 | << m_max_concurrent_syncs << "]" << dendl; | |
105 | } else { | |
106 | m_sync_queue.push_front(sync_holder_ctx); | |
107 | dout(10) << "image sync for local_image_id " << local_image_ctx->id | |
108 | << " has been queued" << dendl; | |
109 | } | |
110 | } | |
111 | ||
112 | if (start) { | |
113 | sync_holder_ctx->m_sync->send(); | |
114 | } | |
115 | } | |
116 | ||
117 | template <typename I> | |
118 | void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx, | |
119 | const std::string local_image_id) { | |
120 | dout(20) << dendl; | |
121 | ||
122 | C_SyncHolder *sync_holder = nullptr; | |
123 | bool running_sync = true; | |
124 | ||
125 | { | |
126 | Mutex::Locker l(m_lock); | |
127 | if (m_inflight_syncs.empty()) { | |
128 | // no image sync currently running and neither waiting | |
129 | return; | |
130 | } | |
131 | ||
132 | PoolImageId local_pool_image_id(local_io_ctx.get_id(), | |
133 | local_image_id); | |
134 | auto it = m_inflight_syncs.find(local_pool_image_id); | |
135 | if (it != m_inflight_syncs.end()) { | |
136 | sync_holder = it->second; | |
137 | } | |
138 | ||
139 | if (!sync_holder) { | |
140 | for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) { | |
141 | if ((*it)->m_local_pool_image_id == local_pool_image_id) { | |
142 | sync_holder = (*it); | |
143 | m_sync_queue.erase(it); | |
144 | running_sync = false; | |
145 | break; | |
146 | } | |
147 | } | |
148 | } | |
149 | } | |
150 | ||
151 | if (sync_holder) { | |
152 | if (running_sync) { | |
153 | dout(10) << "canceled running image sync for local_image_id " | |
154 | << sync_holder->m_local_pool_image_id.second << dendl; | |
155 | sync_holder->m_sync->cancel(); | |
156 | } else { | |
157 | dout(10) << "canceled waiting image sync for local_image_id " | |
158 | << sync_holder->m_local_pool_image_id.second << dendl; | |
159 | sync_holder->m_on_finish->complete(-ECANCELED); | |
160 | sync_holder->m_sync->put(); | |
161 | delete sync_holder; | |
162 | } | |
163 | } | |
164 | } | |
165 | ||
166 | template <typename I> | |
167 | void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) { | |
168 | dout(20) << dendl; | |
169 | ||
170 | C_SyncHolder *next_sync_holder = nullptr; | |
171 | ||
172 | { | |
173 | Mutex::Locker l(m_lock); | |
174 | m_inflight_syncs.erase(sync_holder->m_local_pool_image_id); | |
175 | ||
176 | if (m_inflight_syncs.size() < m_max_concurrent_syncs && | |
177 | !m_sync_queue.empty()) { | |
178 | next_sync_holder = m_sync_queue.back(); | |
179 | m_sync_queue.pop_back(); | |
180 | ||
181 | assert( | |
182 | m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0); | |
183 | m_inflight_syncs[next_sync_holder->m_local_pool_image_id] = | |
184 | next_sync_holder; | |
185 | dout(10) << "ready to start image sync for local_image_id " | |
186 | << next_sync_holder->m_local_pool_image_id.second | |
187 | << " [" << m_inflight_syncs.size() << "/" | |
188 | << m_max_concurrent_syncs << "]" << dendl; | |
189 | } | |
190 | ||
191 | dout(10) << "currently running image syncs [" << m_inflight_syncs.size() | |
192 | << "/" << m_max_concurrent_syncs << "]" << dendl; | |
193 | } | |
194 | ||
195 | if (next_sync_holder) { | |
196 | next_sync_holder->m_sync->send(); | |
197 | } | |
198 | } | |
199 | ||
200 | template <typename I> | |
201 | void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) { | |
202 | dout(20) << " max=" << max << dendl; | |
203 | ||
204 | assert(max > 0); | |
205 | ||
206 | std::list<C_SyncHolder *> next_sync_holders; | |
207 | { | |
208 | Mutex::Locker l(m_lock); | |
209 | this->m_max_concurrent_syncs = max; | |
210 | ||
211 | // Start waiting syncs in the case of available free slots | |
212 | while(m_inflight_syncs.size() < m_max_concurrent_syncs | |
213 | && !m_sync_queue.empty()) { | |
214 | C_SyncHolder *next_sync_holder = m_sync_queue.back(); | |
215 | next_sync_holders.push_back(next_sync_holder); | |
216 | m_sync_queue.pop_back(); | |
217 | ||
218 | assert( | |
219 | m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0); | |
220 | m_inflight_syncs[next_sync_holder->m_local_pool_image_id] = | |
221 | next_sync_holder; | |
222 | ||
223 | dout(10) << "ready to start image sync for local_image_id " | |
224 | << next_sync_holder->m_local_pool_image_id.second | |
225 | << " [" << m_inflight_syncs.size() << "/" | |
226 | << m_max_concurrent_syncs << "]" << dendl; | |
227 | } | |
228 | } | |
229 | ||
230 | for (const auto& sync_holder : next_sync_holders) { | |
231 | sync_holder->m_sync->send(); | |
232 | } | |
233 | } | |
234 | ||
235 | template <typename I> | |
236 | void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) { | |
237 | Mutex::Locker l(m_lock); | |
238 | ||
239 | if (f) { | |
240 | f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); | |
241 | f->dump_int("running_syncs", m_inflight_syncs.size()); | |
242 | f->dump_int("waiting_syncs", m_sync_queue.size()); | |
243 | f->flush(*ss); | |
244 | } else { | |
245 | *ss << "[ "; | |
246 | *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; | |
247 | *ss << "running_syncs=" << m_inflight_syncs.size() << ", "; | |
248 | *ss << "waiting_syncs=" << m_sync_queue.size() << " ]"; | |
249 | } | |
250 | } | |
251 | ||
252 | template <typename I> | |
253 | const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const { | |
254 | static const char* KEYS[] = { | |
255 | "rbd_mirror_concurrent_image_syncs", | |
256 | NULL | |
257 | }; | |
258 | return KEYS; | |
259 | } | |
260 | ||
261 | template <typename I> | |
262 | void ImageSyncThrottler<I>::handle_conf_change( | |
263 | const struct md_config_t *conf, | |
264 | const set<string> &changed) { | |
265 | if (changed.count("rbd_mirror_concurrent_image_syncs")) { | |
266 | set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs); | |
267 | } | |
268 | } | |
269 | ||
270 | } // namespace mirror | |
271 | } // namespace rbd | |
272 | ||
273 | template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>; |