]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc
update download target update for octopus release
[ceph.git] / ceph / src / tools / rbd_mirror / ImageSyncThrottler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 SUSE LINUX GmbH
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "ImageSyncThrottler.h"
16 #include "common/Formatter.h"
17 #include "common/debug.h"
18 #include "common/errno.h"
19 #include "librbd/Utils.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
23 #undef dout_prefix
24 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
25 << " " << __func__ << ": "
26
27 namespace rbd {
28 namespace mirror {
29
30 template <typename I>
31 ImageSyncThrottler<I>::ImageSyncThrottler(CephContext *cct)
32 : m_cct(cct),
33 m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
34 this)),
35 m_max_concurrent_syncs(cct->_conf.get_val<uint64_t>(
36 "rbd_mirror_concurrent_image_syncs")) {
37 dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
38 m_cct->_conf.add_observer(this);
39 }
40
41 template <typename I>
42 ImageSyncThrottler<I>::~ImageSyncThrottler() {
43 m_cct->_conf.remove_observer(this);
44
45 Mutex::Locker locker(m_lock);
46 ceph_assert(m_inflight_ops.empty());
47 ceph_assert(m_queue.empty());
48 }
49
50 template <typename I>
51 void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
52 dout(20) << "id=" << id << dendl;
53
54 int r = 0;
55 {
56 Mutex::Locker locker(m_lock);
57
58 if (m_inflight_ops.count(id) > 0) {
59 dout(20) << "duplicate for already started op " << id << dendl;
60 } else if (m_queued_ops.count(id) > 0) {
61 dout(20) << "duplicate for already queued op " << id << dendl;
62 std::swap(m_queued_ops[id], on_start);
63 r = -ENOENT;
64 } else if (m_max_concurrent_syncs == 0 ||
65 m_inflight_ops.size() < m_max_concurrent_syncs) {
66 ceph_assert(m_queue.empty());
67 m_inflight_ops.insert(id);
68 dout(20) << "ready to start sync for " << id << " ["
69 << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
70 << dendl;
71 } else {
72 m_queue.push_back(id);
73 std::swap(m_queued_ops[id], on_start);
74 dout(20) << "image sync for " << id << " has been queued" << dendl;
75 }
76 }
77
78 if (on_start != nullptr) {
79 on_start->complete(r);
80 }
81 }
82
83 template <typename I>
84 bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
85 dout(20) << "id=" << id << dendl;
86
87 Context *on_start = nullptr;
88 {
89 Mutex::Locker locker(m_lock);
90 auto it = m_queued_ops.find(id);
91 if (it != m_queued_ops.end()) {
92 dout(20) << "canceled queued sync for " << id << dendl;
93 m_queue.remove(id);
94 on_start = it->second;
95 m_queued_ops.erase(it);
96 }
97 }
98
99 if (on_start == nullptr) {
100 return false;
101 }
102
103 on_start->complete(-ECANCELED);
104 return true;
105 }
106
107 template <typename I>
108 void ImageSyncThrottler<I>::finish_op(const std::string &id) {
109 dout(20) << "id=" << id << dendl;
110
111 if (cancel_op(id)) {
112 return;
113 }
114
115 Context *on_start = nullptr;
116 {
117 Mutex::Locker locker(m_lock);
118
119 m_inflight_ops.erase(id);
120
121 if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
122 auto id = m_queue.front();
123 auto it = m_queued_ops.find(id);
124 ceph_assert(it != m_queued_ops.end());
125 m_inflight_ops.insert(id);
126 dout(20) << "ready to start sync for " << id << " ["
127 << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
128 << dendl;
129 on_start = it->second;
130 m_queued_ops.erase(it);
131 m_queue.pop_front();
132 }
133 }
134
135 if (on_start != nullptr) {
136 on_start->complete(0);
137 }
138 }
139
140 template <typename I>
141 void ImageSyncThrottler<I>::drain(int r) {
142 dout(20) << dendl;
143
144 std::map<std::string, Context *> queued_ops;
145 {
146 Mutex::Locker locker(m_lock);
147 std::swap(m_queued_ops, queued_ops);
148 m_queue.clear();
149 m_inflight_ops.clear();
150 }
151
152 for (auto &it : queued_ops) {
153 it.second->complete(r);
154 }
155 }
156
157 template <typename I>
158 void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
159 dout(20) << "max=" << max << dendl;
160
161 std::list<Context *> ops;
162 {
163 Mutex::Locker locker(m_lock);
164 m_max_concurrent_syncs = max;
165
166 // Start waiting ops in the case of available free slots
167 while ((m_max_concurrent_syncs == 0 ||
168 m_inflight_ops.size() < m_max_concurrent_syncs) &&
169 !m_queue.empty()) {
170 auto id = m_queue.front();
171 m_inflight_ops.insert(id);
172 dout(20) << "ready to start sync for " << id << " ["
173 << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
174 << dendl;
175 auto it = m_queued_ops.find(id);
176 ceph_assert(it != m_queued_ops.end());
177 ops.push_back(it->second);
178 m_queued_ops.erase(it);
179 m_queue.pop_front();
180 }
181 }
182
183 for (const auto& ctx : ops) {
184 ctx->complete(0);
185 }
186 }
187
188 template <typename I>
189 void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
190 dout(20) << dendl;
191
192 Mutex::Locker locker(m_lock);
193
194 if (f) {
195 f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
196 f->dump_int("running_syncs", m_inflight_ops.size());
197 f->dump_int("waiting_syncs", m_queue.size());
198 f->flush(*ss);
199 } else {
200 *ss << "[ ";
201 *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
202 *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
203 *ss << "waiting_syncs=" << m_queue.size() << " ]";
204 }
205 }
206
207 template <typename I>
208 const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
209 static const char* KEYS[] = {
210 "rbd_mirror_concurrent_image_syncs",
211 NULL
212 };
213 return KEYS;
214 }
215
216 template <typename I>
217 void ImageSyncThrottler<I>::handle_conf_change(const ConfigProxy& conf,
218 const set<string> &changed) {
219 if (changed.count("rbd_mirror_concurrent_image_syncs")) {
220 set_max_concurrent_syncs(conf.get_val<uint64_t>("rbd_mirror_concurrent_image_syncs"));
221 }
222 }
223
224 } // namespace mirror
225 } // namespace rbd
226
227 template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;