]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 SUSE LINUX GmbH | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "ImageSyncThrottler.h" | |
31f18b77 FG |
16 | #include "common/Formatter.h" |
17 | #include "common/debug.h" | |
18 | #include "common/errno.h" | |
19 | #include "librbd/Utils.h" | |
7c673cae FG |
20 | |
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_rbd_mirror | |
23 | #undef dout_prefix | |
24 | #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ | |
25 | << " " << __func__ << ": " | |
7c673cae FG |
26 | |
27 | namespace rbd { | |
28 | namespace mirror { | |
29 | ||
7c673cae FG |
30 | template <typename I> |
31 | ImageSyncThrottler<I>::ImageSyncThrottler() | |
31f18b77 FG |
32 | : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", |
33 | this)), | |
34 | m_max_concurrent_syncs( | |
35 | g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) { | |
36 | dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl; | |
7c673cae FG |
37 | g_ceph_context->_conf->add_observer(this); |
38 | } | |
39 | ||
40 | template <typename I> | |
41 | ImageSyncThrottler<I>::~ImageSyncThrottler() { | |
7c673cae | 42 | g_ceph_context->_conf->remove_observer(this); |
31f18b77 FG |
43 | |
44 | Mutex::Locker locker(m_lock); | |
45 | assert(m_inflight_ops.empty()); | |
46 | assert(m_queue.empty()); | |
7c673cae FG |
47 | } |
48 | ||
49 | template <typename I> | |
31f18b77 FG |
50 | void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) { |
51 | dout(20) << "id=" << id << dendl; | |
7c673cae | 52 | |
7c673cae | 53 | { |
31f18b77 FG |
54 | Mutex::Locker locker(m_lock); |
55 | ||
56 | if (m_inflight_ops.count(id) > 0) { | |
57 | dout(20) << "duplicate for already started op " << id << dendl; | |
58 | } else if (m_max_concurrent_syncs == 0 || | |
59 | m_inflight_ops.size() < m_max_concurrent_syncs) { | |
60 | assert(m_queue.empty()); | |
61 | m_inflight_ops.insert(id); | |
62 | dout(20) << "ready to start sync for " << id << " [" | |
63 | << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" | |
64 | << dendl; | |
7c673cae | 65 | } else { |
31f18b77 FG |
66 | m_queue.push_back(std::make_pair(id, on_start)); |
67 | on_start = nullptr; | |
68 | dout(20) << "image sync for " << id << " has been queued" << dendl; | |
7c673cae FG |
69 | } |
70 | } | |
71 | ||
31f18b77 FG |
72 | if (on_start != nullptr) { |
73 | on_start->complete(0); | |
7c673cae FG |
74 | } |
75 | } | |
76 | ||
77 | template <typename I> | |
31f18b77 FG |
78 | bool ImageSyncThrottler<I>::cancel_op(const std::string &id) { |
79 | dout(20) << "id=" << id << dendl; | |
7c673cae | 80 | |
31f18b77 | 81 | Context *on_start = nullptr; |
7c673cae | 82 | { |
31f18b77 FG |
83 | Mutex::Locker locker(m_lock); |
84 | for (auto it = m_queue.begin(); it != m_queue.end(); ++it) { | |
85 | if (it->first == id) { | |
86 | on_start = it->second; | |
87 | dout(20) << "canceled queued sync for " << id << dendl; | |
88 | m_queue.erase(it); | |
89 | break; | |
7c673cae FG |
90 | } |
91 | } | |
92 | } | |
93 | ||
31f18b77 FG |
94 | if (on_start == nullptr) { |
95 | return false; | |
7c673cae | 96 | } |
31f18b77 FG |
97 | |
98 | on_start->complete(-ECANCELED); | |
99 | return true; | |
7c673cae FG |
100 | } |
101 | ||
102 | template <typename I> | |
31f18b77 FG |
103 | void ImageSyncThrottler<I>::finish_op(const std::string &id) { |
104 | dout(20) << "id=" << id << dendl; | |
7c673cae | 105 | |
31f18b77 FG |
106 | if (cancel_op(id)) { |
107 | return; | |
108 | } | |
7c673cae | 109 | |
31f18b77 | 110 | Context *on_start = nullptr; |
7c673cae | 111 | { |
31f18b77 FG |
112 | Mutex::Locker locker(m_lock); |
113 | ||
114 | m_inflight_ops.erase(id); | |
115 | ||
116 | if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { | |
117 | auto pair = m_queue.front(); | |
118 | m_inflight_ops.insert(pair.first); | |
119 | dout(20) << "ready to start sync for " << pair.first << " [" | |
120 | << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" | |
121 | << dendl; | |
122 | on_start= pair.second; | |
123 | m_queue.pop_front(); | |
7c673cae | 124 | } |
31f18b77 | 125 | } |
7c673cae | 126 | |
31f18b77 FG |
127 | if (on_start != nullptr) { |
128 | on_start->complete(0); | |
7c673cae | 129 | } |
31f18b77 | 130 | } |
7c673cae | 131 | |
31f18b77 FG |
132 | template <typename I> |
133 | void ImageSyncThrottler<I>::drain(int r) { | |
134 | dout(20) << dendl; | |
135 | ||
136 | std::list<std::pair<std::string, Context *>> queue; | |
137 | { | |
138 | Mutex::Locker locker(m_lock); | |
139 | std::swap(m_queue, queue); | |
140 | m_inflight_ops.clear(); | |
141 | } | |
142 | ||
143 | for (auto &pair : queue) { | |
144 | pair.second->complete(r); | |
7c673cae FG |
145 | } |
146 | } | |
147 | ||
148 | template <typename I> | |
149 | void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) { | |
31f18b77 | 150 | dout(20) << "max=" << max << dendl; |
7c673cae | 151 | |
31f18b77 | 152 | std::list<Context *> ops; |
7c673cae | 153 | { |
31f18b77 FG |
154 | Mutex::Locker locker(m_lock); |
155 | m_max_concurrent_syncs = max; | |
156 | ||
157 | // Start waiting ops in the case of available free slots | |
158 | while ((m_max_concurrent_syncs == 0 || | |
159 | m_inflight_ops.size() < m_max_concurrent_syncs) && | |
160 | !m_queue.empty()) { | |
161 | auto pair = m_queue.front(); | |
162 | m_inflight_ops.insert(pair.first); | |
163 | dout(20) << "ready to start sync for " << pair.first << " [" | |
164 | << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" | |
165 | << dendl; | |
166 | ops.push_back(pair.second); | |
167 | m_queue.pop_front(); | |
7c673cae FG |
168 | } |
169 | } | |
170 | ||
31f18b77 FG |
171 | for (const auto& ctx : ops) { |
172 | ctx->complete(0); | |
7c673cae FG |
173 | } |
174 | } | |
175 | ||
176 | template <typename I> | |
31f18b77 FG |
177 | void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) { |
178 | dout(20) << dendl; | |
179 | ||
180 | Mutex::Locker locker(m_lock); | |
7c673cae FG |
181 | |
182 | if (f) { | |
183 | f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); | |
31f18b77 FG |
184 | f->dump_int("running_syncs", m_inflight_ops.size()); |
185 | f->dump_int("waiting_syncs", m_queue.size()); | |
7c673cae FG |
186 | f->flush(*ss); |
187 | } else { | |
188 | *ss << "[ "; | |
189 | *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; | |
31f18b77 FG |
190 | *ss << "running_syncs=" << m_inflight_ops.size() << ", "; |
191 | *ss << "waiting_syncs=" << m_queue.size() << " ]"; | |
7c673cae FG |
192 | } |
193 | } | |
194 | ||
195 | template <typename I> | |
196 | const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const { | |
197 | static const char* KEYS[] = { | |
198 | "rbd_mirror_concurrent_image_syncs", | |
199 | NULL | |
200 | }; | |
201 | return KEYS; | |
202 | } | |
203 | ||
204 | template <typename I> | |
31f18b77 FG |
205 | void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf, |
206 | const set<string> &changed) { | |
7c673cae FG |
207 | if (changed.count("rbd_mirror_concurrent_image_syncs")) { |
208 | set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs); | |
209 | } | |
210 | } | |
211 | ||
212 | } // namespace mirror | |
213 | } // namespace rbd | |
214 | ||
215 | template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>; |