]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc
update sources to v12.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / ImageSyncThrottler.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 SUSE LINUX GmbH
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "ImageSyncThrottler.h"
31f18b77
FG
16#include "common/Formatter.h"
17#include "common/debug.h"
18#include "common/errno.h"
19#include "librbd/Utils.h"
7c673cae
FG
20
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_rbd_mirror
23#undef dout_prefix
24#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
25 << " " << __func__ << ": "
7c673cae
FG
26
27namespace rbd {
28namespace mirror {
29
7c673cae
FG
30template <typename I>
31ImageSyncThrottler<I>::ImageSyncThrottler()
31f18b77
FG
32 : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
33 this)),
34 m_max_concurrent_syncs(
35 g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) {
36 dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
7c673cae
FG
37 g_ceph_context->_conf->add_observer(this);
38}
39
40template <typename I>
41ImageSyncThrottler<I>::~ImageSyncThrottler() {
7c673cae 42 g_ceph_context->_conf->remove_observer(this);
31f18b77
FG
43
44 Mutex::Locker locker(m_lock);
45 assert(m_inflight_ops.empty());
46 assert(m_queue.empty());
7c673cae
FG
47}
48
49template <typename I>
31f18b77
FG
50void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
51 dout(20) << "id=" << id << dendl;
7c673cae 52
7c673cae 53 {
31f18b77
FG
54 Mutex::Locker locker(m_lock);
55
56 if (m_inflight_ops.count(id) > 0) {
57 dout(20) << "duplicate for already started op " << id << dendl;
58 } else if (m_max_concurrent_syncs == 0 ||
59 m_inflight_ops.size() < m_max_concurrent_syncs) {
60 assert(m_queue.empty());
61 m_inflight_ops.insert(id);
62 dout(20) << "ready to start sync for " << id << " ["
63 << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
64 << dendl;
7c673cae 65 } else {
31f18b77
FG
66 m_queue.push_back(std::make_pair(id, on_start));
67 on_start = nullptr;
68 dout(20) << "image sync for " << id << " has been queued" << dendl;
7c673cae
FG
69 }
70 }
71
31f18b77
FG
72 if (on_start != nullptr) {
73 on_start->complete(0);
7c673cae
FG
74 }
75}
76
77template <typename I>
31f18b77
FG
78bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
79 dout(20) << "id=" << id << dendl;
7c673cae 80
31f18b77 81 Context *on_start = nullptr;
7c673cae 82 {
31f18b77
FG
83 Mutex::Locker locker(m_lock);
84 for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
85 if (it->first == id) {
86 on_start = it->second;
87 dout(20) << "canceled queued sync for " << id << dendl;
88 m_queue.erase(it);
89 break;
7c673cae
FG
90 }
91 }
92 }
93
31f18b77
FG
94 if (on_start == nullptr) {
95 return false;
7c673cae 96 }
31f18b77
FG
97
98 on_start->complete(-ECANCELED);
99 return true;
7c673cae
FG
100}
101
102template <typename I>
31f18b77
FG
103void ImageSyncThrottler<I>::finish_op(const std::string &id) {
104 dout(20) << "id=" << id << dendl;
7c673cae 105
31f18b77
FG
106 if (cancel_op(id)) {
107 return;
108 }
7c673cae 109
31f18b77 110 Context *on_start = nullptr;
7c673cae 111 {
31f18b77
FG
112 Mutex::Locker locker(m_lock);
113
114 m_inflight_ops.erase(id);
115
116 if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
117 auto pair = m_queue.front();
118 m_inflight_ops.insert(pair.first);
119 dout(20) << "ready to start sync for " << pair.first << " ["
120 << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
121 << dendl;
122 on_start= pair.second;
123 m_queue.pop_front();
7c673cae 124 }
31f18b77 125 }
7c673cae 126
31f18b77
FG
127 if (on_start != nullptr) {
128 on_start->complete(0);
7c673cae 129 }
31f18b77 130}
7c673cae 131
31f18b77
FG
132template <typename I>
133void ImageSyncThrottler<I>::drain(int r) {
134 dout(20) << dendl;
135
136 std::list<std::pair<std::string, Context *>> queue;
137 {
138 Mutex::Locker locker(m_lock);
139 std::swap(m_queue, queue);
140 m_inflight_ops.clear();
141 }
142
143 for (auto &pair : queue) {
144 pair.second->complete(r);
7c673cae
FG
145 }
146}
147
148template <typename I>
149void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
31f18b77 150 dout(20) << "max=" << max << dendl;
7c673cae 151
31f18b77 152 std::list<Context *> ops;
7c673cae 153 {
31f18b77
FG
154 Mutex::Locker locker(m_lock);
155 m_max_concurrent_syncs = max;
156
157 // Start waiting ops in the case of available free slots
158 while ((m_max_concurrent_syncs == 0 ||
159 m_inflight_ops.size() < m_max_concurrent_syncs) &&
160 !m_queue.empty()) {
161 auto pair = m_queue.front();
162 m_inflight_ops.insert(pair.first);
163 dout(20) << "ready to start sync for " << pair.first << " ["
164 << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
165 << dendl;
166 ops.push_back(pair.second);
167 m_queue.pop_front();
7c673cae
FG
168 }
169 }
170
31f18b77
FG
171 for (const auto& ctx : ops) {
172 ctx->complete(0);
7c673cae
FG
173 }
174}
175
176template <typename I>
31f18b77
FG
177void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
178 dout(20) << dendl;
179
180 Mutex::Locker locker(m_lock);
7c673cae
FG
181
182 if (f) {
183 f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
31f18b77
FG
184 f->dump_int("running_syncs", m_inflight_ops.size());
185 f->dump_int("waiting_syncs", m_queue.size());
7c673cae
FG
186 f->flush(*ss);
187 } else {
188 *ss << "[ ";
189 *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
31f18b77
FG
190 *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
191 *ss << "waiting_syncs=" << m_queue.size() << " ]";
7c673cae
FG
192 }
193}
194
195template <typename I>
196const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
197 static const char* KEYS[] = {
198 "rbd_mirror_concurrent_image_syncs",
199 NULL
200 };
201 return KEYS;
202}
203
204template <typename I>
31f18b77
FG
205void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
206 const set<string> &changed) {
7c673cae
FG
207 if (changed.count("rbd_mirror_concurrent_image_syncs")) {
208 set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
209 }
210}
211
212} // namespace mirror
213} // namespace rbd
214
215template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;