]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/WBThrottle.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / os / filestore / WBThrottle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "acconfig.h"
5
6 #include "os/filestore/WBThrottle.h"
7 #include "common/perf_counters.h"
8 #include "common/errno.h"
9
10 WBThrottle::WBThrottle(CephContext *cct) :
11 cur_ios(0), cur_size(0),
12 cct(cct),
13 logger(NULL),
14 stopping(true),
15 fs(XFS)
16 {
17 {
18 std::lock_guard l{lock};
19 set_from_conf();
20 }
21 ceph_assert(cct);
22 PerfCountersBuilder b(
23 cct, string("WBThrottle"),
24 l_wbthrottle_first, l_wbthrottle_last);
25 b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied", "Dirty data", NULL, 0, unit_t(UNIT_BYTES));
26 b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb", "Written data", NULL, 0, unit_t(UNIT_BYTES));
27 b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied", "Dirty operations");
28 b.add_u64(l_wbthrottle_ios_wb, "ios_wb", "Written operations");
29 b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied", "Entries waiting for write");
30 b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb", "Written entries");
31 logger = b.create_perf_counters();
32 cct->get_perfcounters_collection()->add(logger);
33 for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i)
34 logger->set(i, 0);
35
36 cct->_conf.add_observer(this);
37 }
38
39 WBThrottle::~WBThrottle() {
40 ceph_assert(cct);
41 cct->get_perfcounters_collection()->remove(logger);
42 delete logger;
43 cct->_conf.remove_observer(this);
44 }
45
46 void WBThrottle::start()
47 {
48 {
49 std::lock_guard l{lock};
50 stopping = false;
51 }
52 create("wb_throttle");
53 }
54
55 void WBThrottle::stop()
56 {
57 {
58 std::lock_guard l{lock};
59 stopping = true;
60 cond.notify_all();
61 }
62
63 join();
64 }
65
66 const char** WBThrottle::get_tracked_conf_keys() const
67 {
68 static const char* KEYS[] = {
69 "filestore_wbthrottle_btrfs_bytes_start_flusher",
70 "filestore_wbthrottle_btrfs_bytes_hard_limit",
71 "filestore_wbthrottle_btrfs_ios_start_flusher",
72 "filestore_wbthrottle_btrfs_ios_hard_limit",
73 "filestore_wbthrottle_btrfs_inodes_start_flusher",
74 "filestore_wbthrottle_btrfs_inodes_hard_limit",
75 "filestore_wbthrottle_xfs_bytes_start_flusher",
76 "filestore_wbthrottle_xfs_bytes_hard_limit",
77 "filestore_wbthrottle_xfs_ios_start_flusher",
78 "filestore_wbthrottle_xfs_ios_hard_limit",
79 "filestore_wbthrottle_xfs_inodes_start_flusher",
80 "filestore_wbthrottle_xfs_inodes_hard_limit",
81 NULL
82 };
83 return KEYS;
84 }
85
86 void WBThrottle::set_from_conf()
87 {
88 ceph_assert(ceph_mutex_is_locked(lock));
89 if (fs == BTRFS) {
90 size_limits.first =
91 cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
92 size_limits.second =
93 cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
94 io_limits.first =
95 cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
96 io_limits.second =
97 cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
98 fd_limits.first =
99 cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
100 fd_limits.second =
101 cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
102 } else if (fs == XFS) {
103 size_limits.first =
104 cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
105 size_limits.second =
106 cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
107 io_limits.first =
108 cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
109 io_limits.second =
110 cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
111 fd_limits.first =
112 cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
113 fd_limits.second =
114 cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
115 } else {
116 ceph_abort_msg("invalid value for fs");
117 }
118 cond.notify_all();
119 }
120
121 void WBThrottle::handle_conf_change(const ConfigProxy& conf,
122 const std::set<std::string> &changed)
123 {
124 std::lock_guard l{lock};
125 for (const char** i = get_tracked_conf_keys(); *i; ++i) {
126 if (changed.count(*i)) {
127 set_from_conf();
128 return;
129 }
130 }
131 }
132
133 bool WBThrottle::get_next_should_flush(
134 std::unique_lock<ceph::mutex>& locker,
135 boost::tuple<ghobject_t, FDRef, PendingWB> *next)
136 {
137 ceph_assert(ceph_mutex_is_locked(lock));
138 ceph_assert(next);
139 {
140 cond.wait(locker, [this] {
141 return stopping || (beyond_limit() && !pending_wbs.empty());
142 });
143 }
144 if (stopping)
145 return false;
146 ceph_assert(!pending_wbs.empty());
147 ghobject_t obj(pop_object());
148
149 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
150 pending_wbs.find(obj);
151 *next = boost::make_tuple(obj, i->second.second, i->second.first);
152 pending_wbs.erase(i);
153 return true;
154 }
155
156
157 void *WBThrottle::entry()
158 {
159 std::unique_lock l{lock};
160 boost::tuple<ghobject_t, FDRef, PendingWB> wb;
161 while (get_next_should_flush(l, &wb)) {
162 clearing = wb.get<0>();
163 cur_ios -= wb.get<2>().ios;
164 logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
165 logger->inc(l_wbthrottle_ios_wb, wb.get<2>().ios);
166 cur_size -= wb.get<2>().size;
167 logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size);
168 logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size);
169 logger->dec(l_wbthrottle_inodes_dirtied);
170 logger->inc(l_wbthrottle_inodes_wb);
171 l.unlock();
172 #if defined(HAVE_FDATASYNC)
173 int r = ::fdatasync(**wb.get<1>());
174 #else
175 int r = ::fsync(**wb.get<1>());
176 #endif
177 if (r < 0) {
178 lderr(cct) << "WBThrottle fsync failed: " << cpp_strerror(errno) << dendl;
179 ceph_abort();
180 }
181 #ifdef HAVE_POSIX_FADVISE
182 if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) {
183 int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
184 ceph_assert(fa_r == 0);
185 }
186 #endif
187 l.lock();
188 clearing = ghobject_t();
189 cond.notify_all();
190 wb = boost::tuple<ghobject_t, FDRef, PendingWB>();
191 }
192 return 0;
193 }
194
195 void WBThrottle::queue_wb(
196 FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len,
197 bool nocache)
198 {
199 std::lock_guard l{lock};
200 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
201 pending_wbs.find(hoid);
202 if (wbiter == pending_wbs.end()) {
203 wbiter = pending_wbs.insert(
204 make_pair(hoid,
205 make_pair(
206 PendingWB(),
207 fd))).first;
208 logger->inc(l_wbthrottle_inodes_dirtied);
209 } else {
210 remove_object(hoid);
211 }
212
213 cur_ios++;
214 logger->inc(l_wbthrottle_ios_dirtied);
215 cur_size += len;
216 logger->inc(l_wbthrottle_bytes_dirtied, len);
217
218 wbiter->second.first.add(nocache, len, 1);
219 insert_object(hoid);
220 if (beyond_limit())
221 cond.notify_all();
222 }
223
224 void WBThrottle::clear()
225 {
226 std::lock_guard l{lock};
227 for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
228 pending_wbs.begin();
229 i != pending_wbs.end();
230 ++i) {
231 #ifdef HAVE_POSIX_FADVISE
232 if (cct->_conf->filestore_fadvise && i->second.first.nocache) {
233 int fa_r = posix_fadvise(**i->second.second, 0, 0, POSIX_FADV_DONTNEED);
234 ceph_assert(fa_r == 0);
235 }
236 #endif
237
238 }
239 cur_ios = cur_size = 0;
240 logger->set(l_wbthrottle_ios_dirtied, 0);
241 logger->set(l_wbthrottle_bytes_dirtied, 0);
242 logger->set(l_wbthrottle_inodes_dirtied, 0);
243 pending_wbs.clear();
244 lru.clear();
245 rev_lru.clear();
246 cond.notify_all();
247 }
248
249 void WBThrottle::clear_object(const ghobject_t &hoid)
250 {
251 std::unique_lock l{lock};
252 cond.wait(l, [hoid, this] { return clearing != hoid; });
253 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
254 pending_wbs.find(hoid);
255 if (i == pending_wbs.end())
256 return;
257
258 cur_ios -= i->second.first.ios;
259 logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios);
260 cur_size -= i->second.first.size;
261 logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size);
262 logger->dec(l_wbthrottle_inodes_dirtied);
263
264 pending_wbs.erase(i);
265 remove_object(hoid);
266 cond.notify_all();
267 }
268
269 void WBThrottle::throttle()
270 {
271 std::unique_lock l{lock};
272 cond.wait(l, [this] { return stopping || !need_flush(); });
273 }