]>
git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 SUSE LINUX GmbH
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "ImageSyncThrottler.h"
16 #include "common/Formatter.h"
17 #include "common/debug.h"
18 #include "common/errno.h"
19 #include "librbd/Utils.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
24 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
25 << " " << __func__ << ": "
31 ImageSyncThrottler
<I
>::ImageSyncThrottler(CephContext
*cct
)
33 m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
35 m_max_concurrent_syncs(cct
->_conf
.get_val
<uint64_t>(
36 "rbd_mirror_concurrent_image_syncs")) {
37 dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs
<< dendl
;
38 m_cct
->_conf
.add_observer(this);
42 ImageSyncThrottler
<I
>::~ImageSyncThrottler() {
43 m_cct
->_conf
.remove_observer(this);
45 Mutex::Locker
locker(m_lock
);
46 ceph_assert(m_inflight_ops
.empty());
47 ceph_assert(m_queue
.empty());
51 void ImageSyncThrottler
<I
>::start_op(const std::string
&id
, Context
*on_start
) {
52 dout(20) << "id=" << id
<< dendl
;
56 Mutex::Locker
locker(m_lock
);
58 if (m_inflight_ops
.count(id
) > 0) {
59 dout(20) << "duplicate for already started op " << id
<< dendl
;
60 } else if (m_queued_ops
.count(id
) > 0) {
61 dout(20) << "duplicate for already queued op " << id
<< dendl
;
62 std::swap(m_queued_ops
[id
], on_start
);
64 } else if (m_max_concurrent_syncs
== 0 ||
65 m_inflight_ops
.size() < m_max_concurrent_syncs
) {
66 ceph_assert(m_queue
.empty());
67 m_inflight_ops
.insert(id
);
68 dout(20) << "ready to start sync for " << id
<< " ["
69 << m_inflight_ops
.size() << "/" << m_max_concurrent_syncs
<< "]"
72 m_queue
.push_back(id
);
73 std::swap(m_queued_ops
[id
], on_start
);
74 dout(20) << "image sync for " << id
<< " has been queued" << dendl
;
78 if (on_start
!= nullptr) {
79 on_start
->complete(r
);
84 bool ImageSyncThrottler
<I
>::cancel_op(const std::string
&id
) {
85 dout(20) << "id=" << id
<< dendl
;
87 Context
*on_start
= nullptr;
89 Mutex::Locker
locker(m_lock
);
90 auto it
= m_queued_ops
.find(id
);
91 if (it
!= m_queued_ops
.end()) {
92 dout(20) << "canceled queued sync for " << id
<< dendl
;
94 on_start
= it
->second
;
95 m_queued_ops
.erase(it
);
99 if (on_start
== nullptr) {
103 on_start
->complete(-ECANCELED
);
107 template <typename I
>
108 void ImageSyncThrottler
<I
>::finish_op(const std::string
&id
) {
109 dout(20) << "id=" << id
<< dendl
;
115 Context
*on_start
= nullptr;
117 Mutex::Locker
locker(m_lock
);
119 m_inflight_ops
.erase(id
);
121 if (m_inflight_ops
.size() < m_max_concurrent_syncs
&& !m_queue
.empty()) {
122 auto id
= m_queue
.front();
123 auto it
= m_queued_ops
.find(id
);
124 ceph_assert(it
!= m_queued_ops
.end());
125 m_inflight_ops
.insert(id
);
126 dout(20) << "ready to start sync for " << id
<< " ["
127 << m_inflight_ops
.size() << "/" << m_max_concurrent_syncs
<< "]"
129 on_start
= it
->second
;
130 m_queued_ops
.erase(it
);
135 if (on_start
!= nullptr) {
136 on_start
->complete(0);
140 template <typename I
>
141 void ImageSyncThrottler
<I
>::drain(int r
) {
144 std::map
<std::string
, Context
*> queued_ops
;
146 Mutex::Locker
locker(m_lock
);
147 std::swap(m_queued_ops
, queued_ops
);
149 m_inflight_ops
.clear();
152 for (auto &it
: queued_ops
) {
153 it
.second
->complete(r
);
157 template <typename I
>
158 void ImageSyncThrottler
<I
>::set_max_concurrent_syncs(uint32_t max
) {
159 dout(20) << "max=" << max
<< dendl
;
161 std::list
<Context
*> ops
;
163 Mutex::Locker
locker(m_lock
);
164 m_max_concurrent_syncs
= max
;
166 // Start waiting ops in the case of available free slots
167 while ((m_max_concurrent_syncs
== 0 ||
168 m_inflight_ops
.size() < m_max_concurrent_syncs
) &&
170 auto id
= m_queue
.front();
171 m_inflight_ops
.insert(id
);
172 dout(20) << "ready to start sync for " << id
<< " ["
173 << m_inflight_ops
.size() << "/" << m_max_concurrent_syncs
<< "]"
175 auto it
= m_queued_ops
.find(id
);
176 ceph_assert(it
!= m_queued_ops
.end());
177 ops
.push_back(it
->second
);
178 m_queued_ops
.erase(it
);
183 for (const auto& ctx
: ops
) {
188 template <typename I
>
189 void ImageSyncThrottler
<I
>::print_status(Formatter
*f
, std::stringstream
*ss
) {
192 Mutex::Locker
locker(m_lock
);
195 f
->dump_int("max_parallel_syncs", m_max_concurrent_syncs
);
196 f
->dump_int("running_syncs", m_inflight_ops
.size());
197 f
->dump_int("waiting_syncs", m_queue
.size());
201 *ss
<< "max_parallel_syncs=" << m_max_concurrent_syncs
<< ", ";
202 *ss
<< "running_syncs=" << m_inflight_ops
.size() << ", ";
203 *ss
<< "waiting_syncs=" << m_queue
.size() << " ]";
207 template <typename I
>
208 const char** ImageSyncThrottler
<I
>::get_tracked_conf_keys() const {
209 static const char* KEYS
[] = {
210 "rbd_mirror_concurrent_image_syncs",
216 template <typename I
>
217 void ImageSyncThrottler
<I
>::handle_conf_change(const ConfigProxy
& conf
,
218 const set
<string
> &changed
) {
219 if (changed
.count("rbd_mirror_concurrent_image_syncs")) {
220 set_max_concurrent_syncs(conf
.get_val
<uint64_t>("rbd_mirror_concurrent_image_syncs"));
224 } // namespace mirror
227 template class rbd::mirror::ImageSyncThrottler
<librbd::ImageCtx
>;