1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "os/filestore/WBThrottle.h"
7 #include "common/perf_counters.h"
8 #include "common/errno.h"
10 WBThrottle::WBThrottle(CephContext
*cct
) :
11 cur_ios(0), cur_size(0),
18 std::lock_guard l
{lock
};
22 PerfCountersBuilder
b(
23 cct
, string("WBThrottle"),
24 l_wbthrottle_first
, l_wbthrottle_last
);
25 b
.add_u64(l_wbthrottle_bytes_dirtied
, "bytes_dirtied", "Dirty data", NULL
, 0, unit_t(UNIT_BYTES
));
26 b
.add_u64(l_wbthrottle_bytes_wb
, "bytes_wb", "Written data", NULL
, 0, unit_t(UNIT_BYTES
));
27 b
.add_u64(l_wbthrottle_ios_dirtied
, "ios_dirtied", "Dirty operations");
28 b
.add_u64(l_wbthrottle_ios_wb
, "ios_wb", "Written operations");
29 b
.add_u64(l_wbthrottle_inodes_dirtied
, "inodes_dirtied", "Entries waiting for write");
30 b
.add_u64(l_wbthrottle_inodes_wb
, "inodes_wb", "Written entries");
31 logger
= b
.create_perf_counters();
32 cct
->get_perfcounters_collection()->add(logger
);
33 for (unsigned i
= l_wbthrottle_first
+ 1; i
!= l_wbthrottle_last
; ++i
)
36 cct
->_conf
.add_observer(this);
39 WBThrottle::~WBThrottle() {
41 cct
->get_perfcounters_collection()->remove(logger
);
43 cct
->_conf
.remove_observer(this);
46 void WBThrottle::start()
49 std::lock_guard l
{lock
};
52 create("wb_throttle");
55 void WBThrottle::stop()
58 std::lock_guard l
{lock
};
66 const char** WBThrottle::get_tracked_conf_keys() const
68 static const char* KEYS
[] = {
69 "filestore_wbthrottle_btrfs_bytes_start_flusher",
70 "filestore_wbthrottle_btrfs_bytes_hard_limit",
71 "filestore_wbthrottle_btrfs_ios_start_flusher",
72 "filestore_wbthrottle_btrfs_ios_hard_limit",
73 "filestore_wbthrottle_btrfs_inodes_start_flusher",
74 "filestore_wbthrottle_btrfs_inodes_hard_limit",
75 "filestore_wbthrottle_xfs_bytes_start_flusher",
76 "filestore_wbthrottle_xfs_bytes_hard_limit",
77 "filestore_wbthrottle_xfs_ios_start_flusher",
78 "filestore_wbthrottle_xfs_ios_hard_limit",
79 "filestore_wbthrottle_xfs_inodes_start_flusher",
80 "filestore_wbthrottle_xfs_inodes_hard_limit",
86 void WBThrottle::set_from_conf()
88 ceph_assert(ceph_mutex_is_locked(lock
));
91 cct
->_conf
->filestore_wbthrottle_btrfs_bytes_start_flusher
;
93 cct
->_conf
->filestore_wbthrottle_btrfs_bytes_hard_limit
;
95 cct
->_conf
->filestore_wbthrottle_btrfs_ios_start_flusher
;
97 cct
->_conf
->filestore_wbthrottle_btrfs_ios_hard_limit
;
99 cct
->_conf
->filestore_wbthrottle_btrfs_inodes_start_flusher
;
101 cct
->_conf
->filestore_wbthrottle_btrfs_inodes_hard_limit
;
102 } else if (fs
== XFS
) {
104 cct
->_conf
->filestore_wbthrottle_xfs_bytes_start_flusher
;
106 cct
->_conf
->filestore_wbthrottle_xfs_bytes_hard_limit
;
108 cct
->_conf
->filestore_wbthrottle_xfs_ios_start_flusher
;
110 cct
->_conf
->filestore_wbthrottle_xfs_ios_hard_limit
;
112 cct
->_conf
->filestore_wbthrottle_xfs_inodes_start_flusher
;
114 cct
->_conf
->filestore_wbthrottle_xfs_inodes_hard_limit
;
116 ceph_abort_msg("invalid value for fs");
121 void WBThrottle::handle_conf_change(const ConfigProxy
& conf
,
122 const std::set
<std::string
> &changed
)
124 std::lock_guard l
{lock
};
125 for (const char** i
= get_tracked_conf_keys(); *i
; ++i
) {
126 if (changed
.count(*i
)) {
133 bool WBThrottle::get_next_should_flush(
134 std::unique_lock
<ceph::mutex
>& locker
,
135 boost::tuple
<ghobject_t
, FDRef
, PendingWB
> *next
)
137 ceph_assert(ceph_mutex_is_locked(lock
));
140 cond
.wait(locker
, [this] {
141 return stopping
|| (beyond_limit() && !pending_wbs
.empty());
146 ceph_assert(!pending_wbs
.empty());
147 ghobject_t
obj(pop_object());
149 ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator i
=
150 pending_wbs
.find(obj
);
151 *next
= boost::make_tuple(obj
, i
->second
.second
, i
->second
.first
);
152 pending_wbs
.erase(i
);
157 void *WBThrottle::entry()
159 std::unique_lock l
{lock
};
160 boost::tuple
<ghobject_t
, FDRef
, PendingWB
> wb
;
161 while (get_next_should_flush(l
, &wb
)) {
162 clearing
= wb
.get
<0>();
163 cur_ios
-= wb
.get
<2>().ios
;
164 logger
->dec(l_wbthrottle_ios_dirtied
, wb
.get
<2>().ios
);
165 logger
->inc(l_wbthrottle_ios_wb
, wb
.get
<2>().ios
);
166 cur_size
-= wb
.get
<2>().size
;
167 logger
->dec(l_wbthrottle_bytes_dirtied
, wb
.get
<2>().size
);
168 logger
->inc(l_wbthrottle_bytes_wb
, wb
.get
<2>().size
);
169 logger
->dec(l_wbthrottle_inodes_dirtied
);
170 logger
->inc(l_wbthrottle_inodes_wb
);
172 #if defined(HAVE_FDATASYNC)
173 int r
= ::fdatasync(**wb
.get
<1>());
175 int r
= ::fsync(**wb
.get
<1>());
178 lderr(cct
) << "WBThrottle fsync failed: " << cpp_strerror(errno
) << dendl
;
181 #ifdef HAVE_POSIX_FADVISE
182 if (cct
->_conf
->filestore_fadvise
&& wb
.get
<2>().nocache
) {
183 int fa_r
= posix_fadvise(**wb
.get
<1>(), 0, 0, POSIX_FADV_DONTNEED
);
184 ceph_assert(fa_r
== 0);
188 clearing
= ghobject_t();
190 wb
= boost::tuple
<ghobject_t
, FDRef
, PendingWB
>();
195 void WBThrottle::queue_wb(
196 FDRef fd
, const ghobject_t
&hoid
, uint64_t offset
, uint64_t len
,
199 std::lock_guard l
{lock
};
200 ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator wbiter
=
201 pending_wbs
.find(hoid
);
202 if (wbiter
== pending_wbs
.end()) {
203 wbiter
= pending_wbs
.insert(
208 logger
->inc(l_wbthrottle_inodes_dirtied
);
214 logger
->inc(l_wbthrottle_ios_dirtied
);
216 logger
->inc(l_wbthrottle_bytes_dirtied
, len
);
218 wbiter
->second
.first
.add(nocache
, len
, 1);
224 void WBThrottle::clear()
226 std::lock_guard l
{lock
};
227 for (ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator i
=
229 i
!= pending_wbs
.end();
231 #ifdef HAVE_POSIX_FADVISE
232 if (cct
->_conf
->filestore_fadvise
&& i
->second
.first
.nocache
) {
233 int fa_r
= posix_fadvise(**i
->second
.second
, 0, 0, POSIX_FADV_DONTNEED
);
234 ceph_assert(fa_r
== 0);
239 cur_ios
= cur_size
= 0;
240 logger
->set(l_wbthrottle_ios_dirtied
, 0);
241 logger
->set(l_wbthrottle_bytes_dirtied
, 0);
242 logger
->set(l_wbthrottle_inodes_dirtied
, 0);
249 void WBThrottle::clear_object(const ghobject_t
&hoid
)
251 std::unique_lock l
{lock
};
252 cond
.wait(l
, [hoid
, this] { return clearing
!= hoid
; });
253 ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator i
=
254 pending_wbs
.find(hoid
);
255 if (i
== pending_wbs
.end())
258 cur_ios
-= i
->second
.first
.ios
;
259 logger
->dec(l_wbthrottle_ios_dirtied
, i
->second
.first
.ios
);
260 cur_size
-= i
->second
.first
.size
;
261 logger
->dec(l_wbthrottle_bytes_dirtied
, i
->second
.first
.size
);
262 logger
->dec(l_wbthrottle_inodes_dirtied
);
264 pending_wbs
.erase(i
);
269 void WBThrottle::throttle()
271 std::unique_lock l
{lock
};
272 cond
.wait(l
, [this] { return stopping
|| !need_flush(); });