1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "os/filestore/WBThrottle.h"
7 #include "common/perf_counters.h"
8 #include "common/errno.h"
13 WBThrottle::WBThrottle(CephContext
*cct
) :
14 cur_ios(0), cur_size(0),
21 std::lock_guard l
{lock
};
25 PerfCountersBuilder
b(
26 cct
, string("WBThrottle"),
27 l_wbthrottle_first
, l_wbthrottle_last
);
28 b
.add_u64(l_wbthrottle_bytes_dirtied
, "bytes_dirtied", "Dirty data", NULL
, 0, unit_t(UNIT_BYTES
));
29 b
.add_u64(l_wbthrottle_bytes_wb
, "bytes_wb", "Written data", NULL
, 0, unit_t(UNIT_BYTES
));
30 b
.add_u64(l_wbthrottle_ios_dirtied
, "ios_dirtied", "Dirty operations");
31 b
.add_u64(l_wbthrottle_ios_wb
, "ios_wb", "Written operations");
32 b
.add_u64(l_wbthrottle_inodes_dirtied
, "inodes_dirtied", "Entries waiting for write");
33 b
.add_u64(l_wbthrottle_inodes_wb
, "inodes_wb", "Written entries");
34 logger
= b
.create_perf_counters();
35 cct
->get_perfcounters_collection()->add(logger
);
36 for (unsigned i
= l_wbthrottle_first
+ 1; i
!= l_wbthrottle_last
; ++i
)
39 cct
->_conf
.add_observer(this);
42 WBThrottle::~WBThrottle() {
44 cct
->get_perfcounters_collection()->remove(logger
);
46 cct
->_conf
.remove_observer(this);
49 void WBThrottle::start()
52 std::lock_guard l
{lock
};
55 create("wb_throttle");
58 void WBThrottle::stop()
61 std::lock_guard l
{lock
};
69 const char** WBThrottle::get_tracked_conf_keys() const
71 static const char* KEYS
[] = {
72 "filestore_wbthrottle_btrfs_bytes_start_flusher",
73 "filestore_wbthrottle_btrfs_bytes_hard_limit",
74 "filestore_wbthrottle_btrfs_ios_start_flusher",
75 "filestore_wbthrottle_btrfs_ios_hard_limit",
76 "filestore_wbthrottle_btrfs_inodes_start_flusher",
77 "filestore_wbthrottle_btrfs_inodes_hard_limit",
78 "filestore_wbthrottle_xfs_bytes_start_flusher",
79 "filestore_wbthrottle_xfs_bytes_hard_limit",
80 "filestore_wbthrottle_xfs_ios_start_flusher",
81 "filestore_wbthrottle_xfs_ios_hard_limit",
82 "filestore_wbthrottle_xfs_inodes_start_flusher",
83 "filestore_wbthrottle_xfs_inodes_hard_limit",
89 void WBThrottle::set_from_conf()
91 ceph_assert(ceph_mutex_is_locked(lock
));
94 cct
->_conf
->filestore_wbthrottle_btrfs_bytes_start_flusher
;
96 cct
->_conf
->filestore_wbthrottle_btrfs_bytes_hard_limit
;
98 cct
->_conf
->filestore_wbthrottle_btrfs_ios_start_flusher
;
100 cct
->_conf
->filestore_wbthrottle_btrfs_ios_hard_limit
;
102 cct
->_conf
->filestore_wbthrottle_btrfs_inodes_start_flusher
;
104 cct
->_conf
->filestore_wbthrottle_btrfs_inodes_hard_limit
;
105 } else if (fs
== XFS
) {
107 cct
->_conf
->filestore_wbthrottle_xfs_bytes_start_flusher
;
109 cct
->_conf
->filestore_wbthrottle_xfs_bytes_hard_limit
;
111 cct
->_conf
->filestore_wbthrottle_xfs_ios_start_flusher
;
113 cct
->_conf
->filestore_wbthrottle_xfs_ios_hard_limit
;
115 cct
->_conf
->filestore_wbthrottle_xfs_inodes_start_flusher
;
117 cct
->_conf
->filestore_wbthrottle_xfs_inodes_hard_limit
;
119 ceph_abort_msg("invalid value for fs");
124 void WBThrottle::handle_conf_change(const ConfigProxy
& conf
,
125 const std::set
<std::string
> &changed
)
127 std::lock_guard l
{lock
};
128 for (const char** i
= get_tracked_conf_keys(); *i
; ++i
) {
129 if (changed
.count(*i
)) {
136 bool WBThrottle::get_next_should_flush(
137 std::unique_lock
<ceph::mutex
>& locker
,
138 boost::tuple
<ghobject_t
, FDRef
, PendingWB
> *next
)
140 ceph_assert(ceph_mutex_is_locked(lock
));
143 cond
.wait(locker
, [this] {
144 return stopping
|| (beyond_limit() && !pending_wbs
.empty());
149 ceph_assert(!pending_wbs
.empty());
150 ghobject_t
obj(pop_object());
152 ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator i
=
153 pending_wbs
.find(obj
);
154 *next
= boost::make_tuple(obj
, i
->second
.second
, i
->second
.first
);
155 pending_wbs
.erase(i
);
160 void *WBThrottle::entry()
162 std::unique_lock l
{lock
};
163 boost::tuple
<ghobject_t
, FDRef
, PendingWB
> wb
;
164 while (get_next_should_flush(l
, &wb
)) {
165 clearing
= wb
.get
<0>();
166 cur_ios
-= wb
.get
<2>().ios
;
167 logger
->dec(l_wbthrottle_ios_dirtied
, wb
.get
<2>().ios
);
168 logger
->inc(l_wbthrottle_ios_wb
, wb
.get
<2>().ios
);
169 cur_size
-= wb
.get
<2>().size
;
170 logger
->dec(l_wbthrottle_bytes_dirtied
, wb
.get
<2>().size
);
171 logger
->inc(l_wbthrottle_bytes_wb
, wb
.get
<2>().size
);
172 logger
->dec(l_wbthrottle_inodes_dirtied
);
173 logger
->inc(l_wbthrottle_inodes_wb
);
175 #if defined(HAVE_FDATASYNC)
176 int r
= ::fdatasync(**wb
.get
<1>());
178 int r
= ::fsync(**wb
.get
<1>());
181 lderr(cct
) << "WBThrottle fsync failed: " << cpp_strerror(errno
) << dendl
;
184 #ifdef HAVE_POSIX_FADVISE
185 if (cct
->_conf
->filestore_fadvise
&& wb
.get
<2>().nocache
) {
186 int fa_r
= posix_fadvise(**wb
.get
<1>(), 0, 0, POSIX_FADV_DONTNEED
);
187 ceph_assert(fa_r
== 0);
191 clearing
= ghobject_t();
193 wb
= boost::tuple
<ghobject_t
, FDRef
, PendingWB
>();
198 void WBThrottle::queue_wb(
199 FDRef fd
, const ghobject_t
&hoid
, uint64_t offset
, uint64_t len
,
202 std::lock_guard l
{lock
};
203 ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator wbiter
=
204 pending_wbs
.find(hoid
);
205 if (wbiter
== pending_wbs
.end()) {
206 wbiter
= pending_wbs
.insert(
211 logger
->inc(l_wbthrottle_inodes_dirtied
);
217 logger
->inc(l_wbthrottle_ios_dirtied
);
219 logger
->inc(l_wbthrottle_bytes_dirtied
, len
);
221 wbiter
->second
.first
.add(nocache
, len
, 1);
227 void WBThrottle::clear()
229 std::lock_guard l
{lock
};
230 for (ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator i
=
232 i
!= pending_wbs
.end();
234 #ifdef HAVE_POSIX_FADVISE
235 if (cct
->_conf
->filestore_fadvise
&& i
->second
.first
.nocache
) {
236 int fa_r
= posix_fadvise(**i
->second
.second
, 0, 0, POSIX_FADV_DONTNEED
);
237 ceph_assert(fa_r
== 0);
242 cur_ios
= cur_size
= 0;
243 logger
->set(l_wbthrottle_ios_dirtied
, 0);
244 logger
->set(l_wbthrottle_bytes_dirtied
, 0);
245 logger
->set(l_wbthrottle_inodes_dirtied
, 0);
252 void WBThrottle::clear_object(const ghobject_t
&hoid
)
254 std::unique_lock l
{lock
};
255 cond
.wait(l
, [hoid
, this] { return clearing
!= hoid
; });
256 ceph::unordered_map
<ghobject_t
, pair
<PendingWB
, FDRef
> >::iterator i
=
257 pending_wbs
.find(hoid
);
258 if (i
== pending_wbs
.end())
261 cur_ios
-= i
->second
.first
.ios
;
262 logger
->dec(l_wbthrottle_ios_dirtied
, i
->second
.first
.ios
);
263 cur_size
-= i
->second
.first
.size
;
264 logger
->dec(l_wbthrottle_bytes_dirtied
, i
->second
.first
.size
);
265 logger
->dec(l_wbthrottle_inodes_dirtied
);
267 pending_wbs
.erase(i
);
272 void WBThrottle::throttle()
274 std::unique_lock l
{lock
};
275 cond
.wait(l
, [this] { return stopping
|| !need_flush(); });