]>
git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/Throttler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 SUSE LINUX GmbH
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "Throttler.h"
16 #include "common/Formatter.h"
17 #include "common/debug.h"
18 #include "common/errno.h"
19 #include "librbd/Utils.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
24 #define dout_prefix *_dout << "rbd::mirror::Throttler:: " << this \
25 << " " << __func__ << ": "
31 Throttler
<I
>::Throttler(CephContext
*cct
, const std::string
&config_key
)
32 : m_cct(cct
), m_config_key(config_key
),
33 m_config_keys
{m_config_key
.c_str(), nullptr},
34 m_lock(ceph::make_mutex(
35 librbd::util::unique_lock_name("rbd::mirror::Throttler", this))),
36 m_max_concurrent_ops(cct
->_conf
.get_val
<uint64_t>(m_config_key
)) {
37 dout(20) << m_config_key
<< "=" << m_max_concurrent_ops
<< dendl
;
38 m_cct
->_conf
.add_observer(this);
42 Throttler
<I
>::~Throttler() {
43 m_cct
->_conf
.remove_observer(this);
45 std::lock_guard locker
{m_lock
};
46 ceph_assert(m_inflight_ops
.empty());
47 ceph_assert(m_queue
.empty());
51 void Throttler
<I
>::start_op(const std::string
&ns
,
52 const std::string
&id_
,
56 dout(20) << "id=" << id
<< dendl
;
60 std::lock_guard locker
{m_lock
};
62 if (m_inflight_ops
.count(id
) > 0) {
63 dout(20) << "duplicate for already started op " << id
<< dendl
;
64 } else if (m_queued_ops
.count(id
) > 0) {
65 dout(20) << "duplicate for already queued op " << id
<< dendl
;
66 std::swap(m_queued_ops
[id
], on_start
);
68 } else if (m_max_concurrent_ops
== 0 ||
69 m_inflight_ops
.size() < m_max_concurrent_ops
) {
70 ceph_assert(m_queue
.empty());
71 m_inflight_ops
.insert(id
);
72 dout(20) << "ready to start op for " << id
<< " ["
73 << m_inflight_ops
.size() << "/" << m_max_concurrent_ops
<< "]"
76 m_queue
.push_back(id
);
77 std::swap(m_queued_ops
[id
], on_start
);
78 dout(20) << "op for " << id
<< " has been queued" << dendl
;
82 if (on_start
!= nullptr) {
83 on_start
->complete(r
);
88 bool Throttler
<I
>::cancel_op(const std::string
&ns
,
89 const std::string
&id_
) {
92 dout(20) << "id=" << id
<< dendl
;
94 Context
*on_start
= nullptr;
96 std::lock_guard locker
{m_lock
};
97 auto it
= m_queued_ops
.find(id
);
98 if (it
!= m_queued_ops
.end()) {
99 dout(20) << "canceled queued op for " << id
<< dendl
;
101 on_start
= it
->second
;
102 m_queued_ops
.erase(it
);
106 if (on_start
== nullptr) {
110 on_start
->complete(-ECANCELED
);
114 template <typename I
>
115 void Throttler
<I
>::finish_op(const std::string
&ns
,
116 const std::string
&id_
) {
119 dout(20) << "id=" << id
<< dendl
;
121 if (cancel_op(ns
, id_
)) {
125 Context
*on_start
= nullptr;
127 std::lock_guard locker
{m_lock
};
129 m_inflight_ops
.erase(id
);
131 if (m_inflight_ops
.size() < m_max_concurrent_ops
&& !m_queue
.empty()) {
132 auto id
= m_queue
.front();
133 auto it
= m_queued_ops
.find(id
);
134 ceph_assert(it
!= m_queued_ops
.end());
135 m_inflight_ops
.insert(id
);
136 dout(20) << "ready to start op for " << id
<< " ["
137 << m_inflight_ops
.size() << "/" << m_max_concurrent_ops
<< "]"
139 on_start
= it
->second
;
140 m_queued_ops
.erase(it
);
145 if (on_start
!= nullptr) {
146 on_start
->complete(0);
150 template <typename I
>
151 void Throttler
<I
>::drain(const std::string
&ns
, int r
) {
152 dout(20) << "ns=" << ns
<< dendl
;
154 std::map
<Id
, Context
*> queued_ops
;
156 std::lock_guard locker
{m_lock
};
157 for (auto it
= m_queued_ops
.begin(); it
!= m_queued_ops
.end(); ) {
158 if (it
->first
.first
== ns
) {
159 queued_ops
[it
->first
] = it
->second
;
160 m_queue
.remove(it
->first
);
161 it
= m_queued_ops
.erase(it
);
166 for (auto it
= m_inflight_ops
.begin(); it
!= m_inflight_ops
.end(); ) {
167 if (it
->first
== ns
) {
168 dout(20) << "inflight_op " << *it
<< dendl
;
169 it
= m_inflight_ops
.erase(it
);
176 for (auto &it
: queued_ops
) {
177 dout(20) << "queued_op " << it
.first
<< dendl
;
178 it
.second
->complete(r
);
182 template <typename I
>
183 void Throttler
<I
>::set_max_concurrent_ops(uint32_t max
) {
184 dout(20) << "max=" << max
<< dendl
;
186 std::list
<Context
*> ops
;
188 std::lock_guard locker
{m_lock
};
189 m_max_concurrent_ops
= max
;
191 // Start waiting ops in the case of available free slots
192 while ((m_max_concurrent_ops
== 0 ||
193 m_inflight_ops
.size() < m_max_concurrent_ops
) &&
195 auto id
= m_queue
.front();
196 m_inflight_ops
.insert(id
);
197 dout(20) << "ready to start op for " << id
<< " ["
198 << m_inflight_ops
.size() << "/" << m_max_concurrent_ops
<< "]"
200 auto it
= m_queued_ops
.find(id
);
201 ceph_assert(it
!= m_queued_ops
.end());
202 ops
.push_back(it
->second
);
203 m_queued_ops
.erase(it
);
208 for (const auto& ctx
: ops
) {
213 template <typename I
>
214 void Throttler
<I
>::print_status(ceph::Formatter
*f
) {
217 std::lock_guard locker
{m_lock
};
219 f
->dump_int("max_parallel_requests", m_max_concurrent_ops
);
220 f
->dump_int("running_requests", m_inflight_ops
.size());
221 f
->dump_int("waiting_requests", m_queue
.size());
224 template <typename I
>
225 const char** Throttler
<I
>::get_tracked_conf_keys() const {
226 return m_config_keys
;
229 template <typename I
>
230 void Throttler
<I
>::handle_conf_change(const ConfigProxy
& conf
,
231 const set
<string
> &changed
) {
232 if (changed
.count(m_config_key
)) {
233 set_max_concurrent_ops(conf
.get_val
<uint64_t>(m_config_key
));
237 } // namespace mirror
240 template class rbd::mirror::Throttler
<librbd::ImageCtx
>;