#define dout_prefix *_dout << objecter->messenger->get_myname() \
<< ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
+using namespace std;
using std::chrono::seconds;
}
// Let the caller know that the operation has failed or was intentionally
- // failed since the caller has been blacklisted.
- if (r == -EBLACKLISTED) {
+ // failed since the caller has been blocklisted.
+ if (r == -EBLOCKLISTED) {
onfinish->complete(r);
return;
}
{
lock_guard l(lock);
if (is_stopping()) {
- onsafe->complete(-EAGAIN);
+ if (onsafe)
+ onsafe->complete(-EAGAIN);
return;
}
_wait_for_flush(onsafe);
{
lock_guard l(lock);
if (is_stopping()) {
- onsafe->complete(-EAGAIN);
+ if (onsafe)
+ onsafe->complete(-EAGAIN);
return;
}
_flush(wrap_finisher(onsafe));
// Update readability (this will also hit any decode errors resulting
// from bad data)
- readable = _is_readable();
+ readable = _have_next_entry();
}
if ((got_any && !was_readable && readable) || read_pos == write_pos) {
/*
- * _is_readable() - return true if next entry is ready.
+ * _have_next_entry() - return true if next entry is ready.
*/
-bool Journaler::_is_readable()
+bool Journaler::_have_next_entry()
{
// anything to read?
if (read_pos == write_pos)
return true;
}
- ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+ ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
<< ", but need " << need << " for next entry; fetch_len is "
<< fetch_len << dendl;
// partial fragment at the end?
if (received_pos == write_pos) {
- ldout(cct, 10) << "is_readable() detected partial entry at tail, "
+ ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
"adjusting write_pos to " << read_pos << dendl;
// adjust write_pos
if (need > fetch_len) {
temp_fetch_len = need;
- ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+ ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
<< dendl;
}
- ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
+ ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
return false;
}
bool Journaler::is_readable()
{
lock_guard l(lock);
+ return _is_readable();
+}
+bool Journaler::_is_readable()
+{
if (error != 0) {
return false;
}
read_pos += consumed;
try {
// We were readable, we might not be any more
- readable = _is_readable();
+ readable = _have_next_entry();
} catch (const buffer::error &e) {
- lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+ lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
error = -EINVAL;
return false;
}
void Journaler::wait_for_readable(Context *onreadable)
{
lock_guard l(lock);
+ _wait_for_readable(onreadable);
+}
+
+void Journaler::_wait_for_readable(Context *onreadable)
+{
if (is_stopping()) {
finisher->queue(onreadable, -EAGAIN);
return;
waitfor_safe.clear();
}
+void Journaler::check_isreadable()
+{
+ std::unique_lock l(lock);
+ while (!_is_readable() &&
+ get_read_pos() < get_write_pos() &&
+ !get_error()) {
+ C_SaferCond readable_waiter;
+ _wait_for_readable(&readable_waiter);
+ l.unlock();
+ readable_waiter.wait();
+ l.lock();
+ }
+ return ;
+}