]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/WBThrottle.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / os / filestore / WBThrottle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "acconfig.h"
5
6 #include "os/filestore/WBThrottle.h"
7 #include "common/perf_counters.h"
8 #include "common/errno.h"
9
10 using std::pair;
11 using std::string;
12
13 WBThrottle::WBThrottle(CephContext *cct) :
14 cur_ios(0), cur_size(0),
15 cct(cct),
16 logger(NULL),
17 stopping(true),
18 fs(XFS)
19 {
20 {
21 std::lock_guard l{lock};
22 set_from_conf();
23 }
24 ceph_assert(cct);
25 PerfCountersBuilder b(
26 cct, string("WBThrottle"),
27 l_wbthrottle_first, l_wbthrottle_last);
28 b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied", "Dirty data", NULL, 0, unit_t(UNIT_BYTES));
29 b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb", "Written data", NULL, 0, unit_t(UNIT_BYTES));
30 b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied", "Dirty operations");
31 b.add_u64(l_wbthrottle_ios_wb, "ios_wb", "Written operations");
32 b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied", "Entries waiting for write");
33 b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb", "Written entries");
34 logger = b.create_perf_counters();
35 cct->get_perfcounters_collection()->add(logger);
36 for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i)
37 logger->set(i, 0);
38
39 cct->_conf.add_observer(this);
40 }
41
42 WBThrottle::~WBThrottle() {
43 ceph_assert(cct);
44 cct->get_perfcounters_collection()->remove(logger);
45 delete logger;
46 cct->_conf.remove_observer(this);
47 }
48
49 void WBThrottle::start()
50 {
51 {
52 std::lock_guard l{lock};
53 stopping = false;
54 }
55 create("wb_throttle");
56 }
57
58 void WBThrottle::stop()
59 {
60 {
61 std::lock_guard l{lock};
62 stopping = true;
63 cond.notify_all();
64 }
65
66 join();
67 }
68
69 const char** WBThrottle::get_tracked_conf_keys() const
70 {
71 static const char* KEYS[] = {
72 "filestore_wbthrottle_btrfs_bytes_start_flusher",
73 "filestore_wbthrottle_btrfs_bytes_hard_limit",
74 "filestore_wbthrottle_btrfs_ios_start_flusher",
75 "filestore_wbthrottle_btrfs_ios_hard_limit",
76 "filestore_wbthrottle_btrfs_inodes_start_flusher",
77 "filestore_wbthrottle_btrfs_inodes_hard_limit",
78 "filestore_wbthrottle_xfs_bytes_start_flusher",
79 "filestore_wbthrottle_xfs_bytes_hard_limit",
80 "filestore_wbthrottle_xfs_ios_start_flusher",
81 "filestore_wbthrottle_xfs_ios_hard_limit",
82 "filestore_wbthrottle_xfs_inodes_start_flusher",
83 "filestore_wbthrottle_xfs_inodes_hard_limit",
84 NULL
85 };
86 return KEYS;
87 }
88
89 void WBThrottle::set_from_conf()
90 {
91 ceph_assert(ceph_mutex_is_locked(lock));
92 if (fs == BTRFS) {
93 size_limits.first =
94 cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
95 size_limits.second =
96 cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
97 io_limits.first =
98 cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
99 io_limits.second =
100 cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
101 fd_limits.first =
102 cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
103 fd_limits.second =
104 cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
105 } else if (fs == XFS) {
106 size_limits.first =
107 cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
108 size_limits.second =
109 cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
110 io_limits.first =
111 cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
112 io_limits.second =
113 cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
114 fd_limits.first =
115 cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
116 fd_limits.second =
117 cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
118 } else {
119 ceph_abort_msg("invalid value for fs");
120 }
121 cond.notify_all();
122 }
123
124 void WBThrottle::handle_conf_change(const ConfigProxy& conf,
125 const std::set<std::string> &changed)
126 {
127 std::lock_guard l{lock};
128 for (const char** i = get_tracked_conf_keys(); *i; ++i) {
129 if (changed.count(*i)) {
130 set_from_conf();
131 return;
132 }
133 }
134 }
135
136 bool WBThrottle::get_next_should_flush(
137 std::unique_lock<ceph::mutex>& locker,
138 boost::tuple<ghobject_t, FDRef, PendingWB> *next)
139 {
140 ceph_assert(ceph_mutex_is_locked(lock));
141 ceph_assert(next);
142 {
143 cond.wait(locker, [this] {
144 return stopping || (beyond_limit() && !pending_wbs.empty());
145 });
146 }
147 if (stopping)
148 return false;
149 ceph_assert(!pending_wbs.empty());
150 ghobject_t obj(pop_object());
151
152 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
153 pending_wbs.find(obj);
154 *next = boost::make_tuple(obj, i->second.second, i->second.first);
155 pending_wbs.erase(i);
156 return true;
157 }
158
159
160 void *WBThrottle::entry()
161 {
162 std::unique_lock l{lock};
163 boost::tuple<ghobject_t, FDRef, PendingWB> wb;
164 while (get_next_should_flush(l, &wb)) {
165 clearing = wb.get<0>();
166 cur_ios -= wb.get<2>().ios;
167 logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
168 logger->inc(l_wbthrottle_ios_wb, wb.get<2>().ios);
169 cur_size -= wb.get<2>().size;
170 logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size);
171 logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size);
172 logger->dec(l_wbthrottle_inodes_dirtied);
173 logger->inc(l_wbthrottle_inodes_wb);
174 l.unlock();
175 #if defined(HAVE_FDATASYNC)
176 int r = ::fdatasync(**wb.get<1>());
177 #else
178 int r = ::fsync(**wb.get<1>());
179 #endif
180 if (r < 0) {
181 lderr(cct) << "WBThrottle fsync failed: " << cpp_strerror(errno) << dendl;
182 ceph_abort();
183 }
184 #ifdef HAVE_POSIX_FADVISE
185 if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) {
186 int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
187 ceph_assert(fa_r == 0);
188 }
189 #endif
190 l.lock();
191 clearing = ghobject_t();
192 cond.notify_all();
193 wb = boost::tuple<ghobject_t, FDRef, PendingWB>();
194 }
195 return 0;
196 }
197
198 void WBThrottle::queue_wb(
199 FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len,
200 bool nocache)
201 {
202 std::lock_guard l{lock};
203 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
204 pending_wbs.find(hoid);
205 if (wbiter == pending_wbs.end()) {
206 wbiter = pending_wbs.insert(
207 make_pair(hoid,
208 make_pair(
209 PendingWB(),
210 fd))).first;
211 logger->inc(l_wbthrottle_inodes_dirtied);
212 } else {
213 remove_object(hoid);
214 }
215
216 cur_ios++;
217 logger->inc(l_wbthrottle_ios_dirtied);
218 cur_size += len;
219 logger->inc(l_wbthrottle_bytes_dirtied, len);
220
221 wbiter->second.first.add(nocache, len, 1);
222 insert_object(hoid);
223 if (beyond_limit())
224 cond.notify_all();
225 }
226
227 void WBThrottle::clear()
228 {
229 std::lock_guard l{lock};
230 for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
231 pending_wbs.begin();
232 i != pending_wbs.end();
233 ++i) {
234 #ifdef HAVE_POSIX_FADVISE
235 if (cct->_conf->filestore_fadvise && i->second.first.nocache) {
236 int fa_r = posix_fadvise(**i->second.second, 0, 0, POSIX_FADV_DONTNEED);
237 ceph_assert(fa_r == 0);
238 }
239 #endif
240
241 }
242 cur_ios = cur_size = 0;
243 logger->set(l_wbthrottle_ios_dirtied, 0);
244 logger->set(l_wbthrottle_bytes_dirtied, 0);
245 logger->set(l_wbthrottle_inodes_dirtied, 0);
246 pending_wbs.clear();
247 lru.clear();
248 rev_lru.clear();
249 cond.notify_all();
250 }
251
252 void WBThrottle::clear_object(const ghobject_t &hoid)
253 {
254 std::unique_lock l{lock};
255 cond.wait(l, [hoid, this] { return clearing != hoid; });
256 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
257 pending_wbs.find(hoid);
258 if (i == pending_wbs.end())
259 return;
260
261 cur_ios -= i->second.first.ios;
262 logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios);
263 cur_size -= i->second.first.size;
264 logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size);
265 logger->dec(l_wbthrottle_inodes_dirtied);
266
267 pending_wbs.erase(i);
268 remove_object(hoid);
269 cond.notify_all();
270 }
271
272 void WBThrottle::throttle()
273 {
274 std::unique_lock l{lock};
275 cond.wait(l, [this] { return stopping || !need_flush(); });
276 }