]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 SUSE LINUX GmbH | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
9f95a23c | 15 | #include "Throttler.h" |
31f18b77 FG |
16 | #include "common/Formatter.h" |
17 | #include "common/debug.h" | |
18 | #include "common/errno.h" | |
19 | #include "librbd/Utils.h" | |
7c673cae FG |
20 | |
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_rbd_mirror | |
23 | #undef dout_prefix | |
9f95a23c | 24 | #define dout_prefix *_dout << "rbd::mirror::Throttler:: " << this \ |
7c673cae | 25 | << " " << __func__ << ": " |
7c673cae FG |
26 | |
27 | namespace rbd { | |
28 | namespace mirror { | |
29 | ||
7c673cae | 30 | template <typename I> |
9f95a23c TL |
31 | Throttler<I>::Throttler(CephContext *cct, const std::string &config_key) |
32 | : m_cct(cct), m_config_key(config_key), | |
33 | m_config_keys{m_config_key.c_str(), nullptr}, | |
34 | m_lock(ceph::make_mutex( | |
35 | librbd::util::unique_lock_name("rbd::mirror::Throttler", this))), | |
36 | m_max_concurrent_ops(cct->_conf.get_val<uint64_t>(m_config_key)) { | |
37 | dout(20) << m_config_key << "=" << m_max_concurrent_ops << dendl; | |
11fdf7f2 | 38 | m_cct->_conf.add_observer(this); |
7c673cae FG |
39 | } |
40 | ||
41 | template <typename I> | |
9f95a23c | 42 | Throttler<I>::~Throttler() { |
11fdf7f2 | 43 | m_cct->_conf.remove_observer(this); |
31f18b77 | 44 | |
9f95a23c | 45 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
46 | ceph_assert(m_inflight_ops.empty()); |
47 | ceph_assert(m_queue.empty()); | |
7c673cae FG |
48 | } |
49 | ||
50 | template <typename I> | |
9f95a23c TL |
51 | void Throttler<I>::start_op(const std::string &ns, |
52 | const std::string &id_, | |
53 | Context *on_start) { | |
54 | Id id{ns, id_}; | |
55 | ||
31f18b77 | 56 | dout(20) << "id=" << id << dendl; |
7c673cae | 57 | |
494da23a | 58 | int r = 0; |
7c673cae | 59 | { |
9f95a23c | 60 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
61 | |
62 | if (m_inflight_ops.count(id) > 0) { | |
63 | dout(20) << "duplicate for already started op " << id << dendl; | |
494da23a TL |
64 | } else if (m_queued_ops.count(id) > 0) { |
65 | dout(20) << "duplicate for already queued op " << id << dendl; | |
66 | std::swap(m_queued_ops[id], on_start); | |
67 | r = -ENOENT; | |
9f95a23c TL |
68 | } else if (m_max_concurrent_ops == 0 || |
69 | m_inflight_ops.size() < m_max_concurrent_ops) { | |
11fdf7f2 | 70 | ceph_assert(m_queue.empty()); |
31f18b77 | 71 | m_inflight_ops.insert(id); |
9f95a23c TL |
72 | dout(20) << "ready to start op for " << id << " [" |
73 | << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]" | |
31f18b77 | 74 | << dendl; |
7c673cae | 75 | } else { |
494da23a TL |
76 | m_queue.push_back(id); |
77 | std::swap(m_queued_ops[id], on_start); | |
9f95a23c | 78 | dout(20) << "op for " << id << " has been queued" << dendl; |
7c673cae FG |
79 | } |
80 | } | |
81 | ||
31f18b77 | 82 | if (on_start != nullptr) { |
494da23a | 83 | on_start->complete(r); |
7c673cae FG |
84 | } |
85 | } | |
86 | ||
87 | template <typename I> | |
9f95a23c TL |
88 | bool Throttler<I>::cancel_op(const std::string &ns, |
89 | const std::string &id_) { | |
90 | Id id{ns, id_}; | |
91 | ||
31f18b77 | 92 | dout(20) << "id=" << id << dendl; |
7c673cae | 93 | |
31f18b77 | 94 | Context *on_start = nullptr; |
7c673cae | 95 | { |
9f95a23c | 96 | std::lock_guard locker{m_lock}; |
494da23a TL |
97 | auto it = m_queued_ops.find(id); |
98 | if (it != m_queued_ops.end()) { | |
9f95a23c | 99 | dout(20) << "canceled queued op for " << id << dendl; |
494da23a TL |
100 | m_queue.remove(id); |
101 | on_start = it->second; | |
102 | m_queued_ops.erase(it); | |
7c673cae FG |
103 | } |
104 | } | |
105 | ||
31f18b77 FG |
106 | if (on_start == nullptr) { |
107 | return false; | |
7c673cae | 108 | } |
31f18b77 FG |
109 | |
110 | on_start->complete(-ECANCELED); | |
111 | return true; | |
7c673cae FG |
112 | } |
113 | ||
114 | template <typename I> | |
9f95a23c TL |
115 | void Throttler<I>::finish_op(const std::string &ns, |
116 | const std::string &id_) { | |
117 | Id id{ns, id_}; | |
118 | ||
31f18b77 | 119 | dout(20) << "id=" << id << dendl; |
7c673cae | 120 | |
9f95a23c | 121 | if (cancel_op(ns, id_)) { |
31f18b77 FG |
122 | return; |
123 | } | |
7c673cae | 124 | |
31f18b77 | 125 | Context *on_start = nullptr; |
7c673cae | 126 | { |
9f95a23c | 127 | std::lock_guard locker{m_lock}; |
31f18b77 FG |
128 | |
129 | m_inflight_ops.erase(id); | |
130 | ||
9f95a23c | 131 | if (m_inflight_ops.size() < m_max_concurrent_ops && !m_queue.empty()) { |
494da23a TL |
132 | auto id = m_queue.front(); |
133 | auto it = m_queued_ops.find(id); | |
134 | ceph_assert(it != m_queued_ops.end()); | |
135 | m_inflight_ops.insert(id); | |
9f95a23c TL |
136 | dout(20) << "ready to start op for " << id << " [" |
137 | << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]" | |
31f18b77 | 138 | << dendl; |
494da23a TL |
139 | on_start = it->second; |
140 | m_queued_ops.erase(it); | |
31f18b77 | 141 | m_queue.pop_front(); |
7c673cae | 142 | } |
31f18b77 | 143 | } |
7c673cae | 144 | |
31f18b77 FG |
145 | if (on_start != nullptr) { |
146 | on_start->complete(0); | |
7c673cae | 147 | } |
31f18b77 | 148 | } |
7c673cae | 149 | |
31f18b77 | 150 | template <typename I> |
9f95a23c TL |
151 | void Throttler<I>::drain(const std::string &ns, int r) { |
152 | dout(20) << "ns=" << ns << dendl; | |
31f18b77 | 153 | |
9f95a23c | 154 | std::map<Id, Context *> queued_ops; |
31f18b77 | 155 | { |
9f95a23c TL |
156 | std::lock_guard locker{m_lock}; |
157 | for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) { | |
158 | if (it->first.first == ns) { | |
159 | queued_ops[it->first] = it->second; | |
160 | m_queue.remove(it->first); | |
161 | it = m_queued_ops.erase(it); | |
162 | } else { | |
163 | it++; | |
164 | } | |
165 | } | |
166 | for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) { | |
167 | if (it->first == ns) { | |
168 | dout(20) << "inflight_op " << *it << dendl; | |
169 | it = m_inflight_ops.erase(it); | |
170 | } else { | |
171 | it++; | |
172 | } | |
173 | } | |
31f18b77 FG |
174 | } |
175 | ||
494da23a | 176 | for (auto &it : queued_ops) { |
9f95a23c | 177 | dout(20) << "queued_op " << it.first << dendl; |
494da23a | 178 | it.second->complete(r); |
7c673cae FG |
179 | } |
180 | } | |
181 | ||
182 | template <typename I> | |
9f95a23c | 183 | void Throttler<I>::set_max_concurrent_ops(uint32_t max) { |
31f18b77 | 184 | dout(20) << "max=" << max << dendl; |
7c673cae | 185 | |
31f18b77 | 186 | std::list<Context *> ops; |
7c673cae | 187 | { |
9f95a23c TL |
188 | std::lock_guard locker{m_lock}; |
189 | m_max_concurrent_ops = max; | |
31f18b77 FG |
190 | |
191 | // Start waiting ops in the case of available free slots | |
9f95a23c TL |
192 | while ((m_max_concurrent_ops == 0 || |
193 | m_inflight_ops.size() < m_max_concurrent_ops) && | |
31f18b77 | 194 | !m_queue.empty()) { |
494da23a TL |
195 | auto id = m_queue.front(); |
196 | m_inflight_ops.insert(id); | |
9f95a23c TL |
197 | dout(20) << "ready to start op for " << id << " [" |
198 | << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]" | |
31f18b77 | 199 | << dendl; |
494da23a TL |
200 | auto it = m_queued_ops.find(id); |
201 | ceph_assert(it != m_queued_ops.end()); | |
202 | ops.push_back(it->second); | |
203 | m_queued_ops.erase(it); | |
31f18b77 | 204 | m_queue.pop_front(); |
7c673cae FG |
205 | } |
206 | } | |
207 | ||
31f18b77 FG |
208 | for (const auto& ctx : ops) { |
209 | ctx->complete(0); | |
7c673cae FG |
210 | } |
211 | } | |
212 | ||
213 | template <typename I> | |
9f95a23c | 214 | void Throttler<I>::print_status(ceph::Formatter *f) { |
31f18b77 FG |
215 | dout(20) << dendl; |
216 | ||
9f95a23c TL |
217 | std::lock_guard locker{m_lock}; |
218 | ||
219 | f->dump_int("max_parallel_requests", m_max_concurrent_ops); | |
220 | f->dump_int("running_requests", m_inflight_ops.size()); | |
221 | f->dump_int("waiting_requests", m_queue.size()); | |
7c673cae FG |
222 | } |
223 | ||
224 | template <typename I> | |
9f95a23c TL |
225 | const char** Throttler<I>::get_tracked_conf_keys() const { |
226 | return m_config_keys; | |
7c673cae FG |
227 | } |
228 | ||
229 | template <typename I> | |
9f95a23c | 230 | void Throttler<I>::handle_conf_change(const ConfigProxy& conf, |
20effc67 | 231 | const std::set<std::string> &changed) { |
9f95a23c TL |
232 | if (changed.count(m_config_key)) { |
233 | set_max_concurrent_ops(conf.get_val<uint64_t>(m_config_key)); | |
7c673cae FG |
234 | } |
235 | } | |
236 | ||
237 | } // namespace mirror | |
238 | } // namespace rbd | |
239 | ||
9f95a23c | 240 | template class rbd::mirror::Throttler<librbd::ImageCtx>; |