]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Throttler.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / Throttler.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 SUSE LINUX GmbH
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
9f95a23c 15#include "Throttler.h"
31f18b77
FG
16#include "common/Formatter.h"
17#include "common/debug.h"
18#include "common/errno.h"
19#include "librbd/Utils.h"
7c673cae
FG
20
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_rbd_mirror
23#undef dout_prefix
9f95a23c 24#define dout_prefix *_dout << "rbd::mirror::Throttler:: " << this \
7c673cae 25 << " " << __func__ << ": "
7c673cae
FG
26
27namespace rbd {
28namespace mirror {
29
7c673cae 30template <typename I>
9f95a23c
TL
31Throttler<I>::Throttler(CephContext *cct, const std::string &config_key)
32 : m_cct(cct), m_config_key(config_key),
33 m_config_keys{m_config_key.c_str(), nullptr},
34 m_lock(ceph::make_mutex(
35 librbd::util::unique_lock_name("rbd::mirror::Throttler", this))),
36 m_max_concurrent_ops(cct->_conf.get_val<uint64_t>(m_config_key)) {
37 dout(20) << m_config_key << "=" << m_max_concurrent_ops << dendl;
11fdf7f2 38 m_cct->_conf.add_observer(this);
7c673cae
FG
39}
40
41template <typename I>
9f95a23c 42Throttler<I>::~Throttler() {
11fdf7f2 43 m_cct->_conf.remove_observer(this);
31f18b77 44
9f95a23c 45 std::lock_guard locker{m_lock};
11fdf7f2
TL
46 ceph_assert(m_inflight_ops.empty());
47 ceph_assert(m_queue.empty());
7c673cae
FG
48}
49
50template <typename I>
9f95a23c
TL
51void Throttler<I>::start_op(const std::string &ns,
52 const std::string &id_,
53 Context *on_start) {
54 Id id{ns, id_};
55
31f18b77 56 dout(20) << "id=" << id << dendl;
7c673cae 57
494da23a 58 int r = 0;
7c673cae 59 {
9f95a23c 60 std::lock_guard locker{m_lock};
31f18b77
FG
61
62 if (m_inflight_ops.count(id) > 0) {
63 dout(20) << "duplicate for already started op " << id << dendl;
494da23a
TL
64 } else if (m_queued_ops.count(id) > 0) {
65 dout(20) << "duplicate for already queued op " << id << dendl;
66 std::swap(m_queued_ops[id], on_start);
67 r = -ENOENT;
9f95a23c
TL
68 } else if (m_max_concurrent_ops == 0 ||
69 m_inflight_ops.size() < m_max_concurrent_ops) {
11fdf7f2 70 ceph_assert(m_queue.empty());
31f18b77 71 m_inflight_ops.insert(id);
9f95a23c
TL
72 dout(20) << "ready to start op for " << id << " ["
73 << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
31f18b77 74 << dendl;
7c673cae 75 } else {
494da23a
TL
76 m_queue.push_back(id);
77 std::swap(m_queued_ops[id], on_start);
9f95a23c 78 dout(20) << "op for " << id << " has been queued" << dendl;
7c673cae
FG
79 }
80 }
81
31f18b77 82 if (on_start != nullptr) {
494da23a 83 on_start->complete(r);
7c673cae
FG
84 }
85}
86
87template <typename I>
9f95a23c
TL
88bool Throttler<I>::cancel_op(const std::string &ns,
89 const std::string &id_) {
90 Id id{ns, id_};
91
31f18b77 92 dout(20) << "id=" << id << dendl;
7c673cae 93
31f18b77 94 Context *on_start = nullptr;
7c673cae 95 {
9f95a23c 96 std::lock_guard locker{m_lock};
494da23a
TL
97 auto it = m_queued_ops.find(id);
98 if (it != m_queued_ops.end()) {
9f95a23c 99 dout(20) << "canceled queued op for " << id << dendl;
494da23a
TL
100 m_queue.remove(id);
101 on_start = it->second;
102 m_queued_ops.erase(it);
7c673cae
FG
103 }
104 }
105
31f18b77
FG
106 if (on_start == nullptr) {
107 return false;
7c673cae 108 }
31f18b77
FG
109
110 on_start->complete(-ECANCELED);
111 return true;
7c673cae
FG
112}
113
114template <typename I>
9f95a23c
TL
115void Throttler<I>::finish_op(const std::string &ns,
116 const std::string &id_) {
117 Id id{ns, id_};
118
31f18b77 119 dout(20) << "id=" << id << dendl;
7c673cae 120
9f95a23c 121 if (cancel_op(ns, id_)) {
31f18b77
FG
122 return;
123 }
7c673cae 124
31f18b77 125 Context *on_start = nullptr;
7c673cae 126 {
9f95a23c 127 std::lock_guard locker{m_lock};
31f18b77
FG
128
129 m_inflight_ops.erase(id);
130
9f95a23c 131 if (m_inflight_ops.size() < m_max_concurrent_ops && !m_queue.empty()) {
494da23a
TL
132 auto id = m_queue.front();
133 auto it = m_queued_ops.find(id);
134 ceph_assert(it != m_queued_ops.end());
135 m_inflight_ops.insert(id);
9f95a23c
TL
136 dout(20) << "ready to start op for " << id << " ["
137 << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
31f18b77 138 << dendl;
494da23a
TL
139 on_start = it->second;
140 m_queued_ops.erase(it);
31f18b77 141 m_queue.pop_front();
7c673cae 142 }
31f18b77 143 }
7c673cae 144
31f18b77
FG
145 if (on_start != nullptr) {
146 on_start->complete(0);
7c673cae 147 }
31f18b77 148}
7c673cae 149
31f18b77 150template <typename I>
9f95a23c
TL
151void Throttler<I>::drain(const std::string &ns, int r) {
152 dout(20) << "ns=" << ns << dendl;
31f18b77 153
9f95a23c 154 std::map<Id, Context *> queued_ops;
31f18b77 155 {
9f95a23c
TL
156 std::lock_guard locker{m_lock};
157 for (auto it = m_queued_ops.begin(); it != m_queued_ops.end(); ) {
158 if (it->first.first == ns) {
159 queued_ops[it->first] = it->second;
160 m_queue.remove(it->first);
161 it = m_queued_ops.erase(it);
162 } else {
163 it++;
164 }
165 }
166 for (auto it = m_inflight_ops.begin(); it != m_inflight_ops.end(); ) {
167 if (it->first == ns) {
168 dout(20) << "inflight_op " << *it << dendl;
169 it = m_inflight_ops.erase(it);
170 } else {
171 it++;
172 }
173 }
31f18b77
FG
174 }
175
494da23a 176 for (auto &it : queued_ops) {
9f95a23c 177 dout(20) << "queued_op " << it.first << dendl;
494da23a 178 it.second->complete(r);
7c673cae
FG
179 }
180}
181
182template <typename I>
9f95a23c 183void Throttler<I>::set_max_concurrent_ops(uint32_t max) {
31f18b77 184 dout(20) << "max=" << max << dendl;
7c673cae 185
31f18b77 186 std::list<Context *> ops;
7c673cae 187 {
9f95a23c
TL
188 std::lock_guard locker{m_lock};
189 m_max_concurrent_ops = max;
31f18b77
FG
190
191 // Start waiting ops in the case of available free slots
9f95a23c
TL
192 while ((m_max_concurrent_ops == 0 ||
193 m_inflight_ops.size() < m_max_concurrent_ops) &&
31f18b77 194 !m_queue.empty()) {
494da23a
TL
195 auto id = m_queue.front();
196 m_inflight_ops.insert(id);
9f95a23c
TL
197 dout(20) << "ready to start op for " << id << " ["
198 << m_inflight_ops.size() << "/" << m_max_concurrent_ops << "]"
31f18b77 199 << dendl;
494da23a
TL
200 auto it = m_queued_ops.find(id);
201 ceph_assert(it != m_queued_ops.end());
202 ops.push_back(it->second);
203 m_queued_ops.erase(it);
31f18b77 204 m_queue.pop_front();
7c673cae
FG
205 }
206 }
207
31f18b77
FG
208 for (const auto& ctx : ops) {
209 ctx->complete(0);
7c673cae
FG
210 }
211}
212
213template <typename I>
9f95a23c 214void Throttler<I>::print_status(ceph::Formatter *f) {
31f18b77
FG
215 dout(20) << dendl;
216
9f95a23c
TL
217 std::lock_guard locker{m_lock};
218
219 f->dump_int("max_parallel_requests", m_max_concurrent_ops);
220 f->dump_int("running_requests", m_inflight_ops.size());
221 f->dump_int("waiting_requests", m_queue.size());
7c673cae
FG
222}
223
224template <typename I>
9f95a23c
TL
225const char** Throttler<I>::get_tracked_conf_keys() const {
226 return m_config_keys;
7c673cae
FG
227}
228
229template <typename I>
9f95a23c 230void Throttler<I>::handle_conf_change(const ConfigProxy& conf,
20effc67 231 const std::set<std::string> &changed) {
9f95a23c
TL
232 if (changed.count(m_config_key)) {
233 set_max_concurrent_ops(conf.get_val<uint64_t>(m_config_key));
7c673cae
FG
234 }
235}
236
237} // namespace mirror
238} // namespace rbd
239
9f95a23c 240template class rbd::mirror::Throttler<librbd::ImageCtx>;