#include "msg/Messenger.h"
#include "osdc/Journaler.h"
#include "common/errno.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "common/Finisher.h"
#define dout_subsys ceph_subsys_journaler
#define dout_prefix *_dout << objecter->messenger->get_myname() \
<< ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
+using namespace std;
using std::chrono::seconds;
class Journaler::C_DelayFlush : public Context {
Journaler *journaler;
public:
- C_DelayFlush(Journaler *j) : journaler(j) {}
+ explicit C_DelayFlush(Journaler *j) : journaler(j) {}
void finish(int r) override {
journaler->_do_delayed_flush();
}
{
lock_guard lk(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
state = STATE_ACTIVE;
stream_format = sf;
// prefetch intelligently.
// (watch out, this is big if you use big objects or weird striping)
- uint64_t periods = cct->_conf->journaler_prefetch_periods;
- if (periods < 2)
- periods = 2; // we need at least 2 periods to make progress.
+ uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
fetch_len = layout.get_period() * periods;
}
}
ldout(cct, 1) << "recover start" << dendl;
- assert(state != STATE_ACTIVE);
- assert(readonly);
+ ceph_assert(state != STATE_ACTIVE);
+ ceph_assert(readonly);
if (onread)
waitfor_recover.push_back(wrap_finisher(onread));
void Journaler::_read_head(Context *on_finish, bufferlist *bl)
{
// lock is locked
- assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
+ ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
void Journaler::_reread_head(Context *onfinish)
{
ldout(cct, 10) << "reread_head" << dendl;
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
state = STATE_REREADHEAD;
C_RereadHead *fin = new C_RereadHead(this, onfinish);
}
//read on-disk header into
- assert(bl.length() || r < 0 );
+ ceph_assert(bl.length() || r < 0 );
// unpack header
if (r == 0) {
Header h;
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
- ::decode(h, p);
+ decode(h, p);
} catch (const buffer::error &e) {
finish->complete(-EINVAL);
return;
if (is_stopping())
return;
- assert(state == STATE_READHEAD);
+ ceph_assert(state == STATE_READHEAD);
if (r!=0) {
ldout(cct, 0) << "error getting journal off disk" << dendl;
// unpack header
bool corrupt = false;
Header h;
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
- ::decode(h, p);
+ decode(h, p);
if (h.magic != magic) {
ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
{
// lock is locked
ldout(cct, 1) << "probing for end of the log" << dendl;
- assert(state == STATE_PROBING || state == STATE_REPROBING);
+ ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
write_pos, end, true, 0, wrap_finisher(finish));
void Journaler::_reprobe(C_OnFinisher *finish)
{
ldout(cct, 10) << "reprobe" << dendl;
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
state = STATE_REPROBING;
C_ReProbe *fin = new C_ReProbe(this, finish);
return;
}
- assert(new_end >= write_pos || r < 0);
+ ceph_assert(new_end >= write_pos || r < 0);
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
<< " (header had " << write_pos << ")."
<< dendl;
if (is_stopping())
return;
- assert(state == STATE_PROBING);
+ ceph_assert(state == STATE_PROBING);
if (r < 0) { // error in probing
goto out;
}
<< write_pos << "). log was empty. recovered." << dendl;
ceph_abort(); // hrm.
} else {
- assert(end >= write_pos);
+ ceph_assert(end >= write_pos);
ldout(cct, 1) << "_finish_probe_end write_pos = " << end
<< " (header had " << write_pos << "). recovered."
<< dendl;
{
lock_guard l(lock);
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
_reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
}
return;
}
- assert(!r); //if we get an error, we're boned
+ // Let the caller know that the operation has failed or was intentionally
+ // failed since the caller has been blocklisted.
+ if (r == -EBLOCKLISTED) {
+ onfinish->complete(r);
+ return;
+ }
+
+ ceph_assert(!r); //if we get an error, we're boned
_reprobe(onfinish);
}
void Journaler::_write_head(Context *oncommit)
{
- assert(!readonly);
- assert(state == STATE_ACTIVE);
+ ceph_assert(!readonly);
+ ceph_assert(state == STATE_ACTIVE);
last_written.trimmed_pos = trimmed_pos;
last_written.expire_pos = expire_pos;
last_written.unused_field = expire_pos;
ldout(cct, 10) << "write_head " << last_written << dendl;
// Avoid persisting bad pointers in case of bugs
- assert(last_written.write_pos >= last_written.expire_pos);
- assert(last_written.expire_pos >= last_written.trimmed_pos);
+ ceph_assert(last_written.write_pos >= last_written.expire_pos);
+ ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
last_wrote_head = ceph::real_clock::now();
bufferlist bl;
- ::encode(last_written, bl);
+ encode(last_written, bl);
SnapContext snapc;
object_t oid = file_object_t(ino, 0);
handle_write_error(r);
return;
}
- assert(!readonly);
+ ceph_assert(!readonly);
ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
last_committed = wrote;
if (oncommit) {
void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
{
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
if (r < 0) {
lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
return;
}
- assert(start < flush_pos);
+ ceph_assert(start < flush_pos);
// calc latency?
if (logger) {
// adjust safe_pos
auto it = pending_safe.find(start);
- assert(it != pending_safe.end());
+ ceph_assert(it != pending_safe.end());
+ uint64_t min_next_safe_pos = pending_safe.begin()->second;
pending_safe.erase(it);
if (pending_safe.empty())
safe_pos = next_safe_pos;
else
- safe_pos = pending_safe.begin()->second;
+ safe_pos = min_next_safe_pos;
ldout(cct, 10) << "_finish_flush safe from " << start
<< ", pending_safe " << pending_safe
{
unique_lock l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
uint32_t s = bl.length();
// append
// flush previous object?
uint64_t su = get_layout_period();
- assert(su > 0);
+ ceph_assert(su > 0);
uint64_t write_off = write_pos % su;
uint64_t write_obj = write_pos / su;
uint64_t flush_obj = flush_pos / su;
return;
if (write_pos == flush_pos)
return;
- assert(write_pos > flush_pos);
- assert(!readonly);
+ ceph_assert(write_pos > flush_pos);
+ ceph_assert(!readonly);
// flush
uint64_t len = write_pos - flush_pos;
- assert(len == write_buf.length());
+ ceph_assert(len == write_buf.length());
if (amount && amount < len)
len = amount;
ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
<< " already too close to prezero_pos " << prezero_pos
<< ", zeroing first" << dendl;
- waiting_for_zero = true;
+ waiting_for_zero_pos = flush_pos + len;
return;
}
if (static_cast<uint64_t>(newlen) < len) {
ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
<< " but hit prezero_pos " << prezero_pos
<< ", will do " << flush_pos << "~" << newlen << dendl;
+ waiting_for_zero_pos = flush_pos + len;
len = newlen;
- } else {
- waiting_for_zero = false;
}
- } else {
- waiting_for_zero = false;
}
ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
wrap_finisher(onsafe), write_iohint);
flush_pos += len;
- assert(write_buf.length() == write_pos - flush_pos);
+ ceph_assert(write_buf.length() == write_pos - flush_pos);
write_buf_throttle.put(len);
ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
{
lock_guard l(lock);
if (is_stopping()) {
- onsafe->complete(-EAGAIN);
+ if (onsafe)
+ onsafe->complete(-EAGAIN);
return;
}
_wait_for_flush(onsafe);
void Journaler::_wait_for_flush(Context *onsafe)
{
- assert(!readonly);
+ ceph_assert(!readonly);
// all flushed and safe?
if (write_pos == safe_pos) {
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
ldout(cct, 10)
<< "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
"pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
{
lock_guard l(lock);
if (is_stopping()) {
- onsafe->complete(-EAGAIN);
+ if (onsafe)
+ onsafe->complete(-EAGAIN);
return;
}
_flush(wrap_finisher(onsafe));
void Journaler::_flush(C_OnFinisher *onsafe)
{
- assert(!readonly);
+ ceph_assert(!readonly);
if (write_pos == flush_pos) {
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
"flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
<< ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
bool Journaler::_write_head_needed()
{
- return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval)
+ return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
< ceph::real_clock::now();
}
void Journaler::_issue_prezero()
{
- assert(prezeroing_pos >= flush_pos);
-
- // we need to zero at least two periods, minimum, to ensure that we
- // have a full empty object/period in front of us.
- uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
+ ceph_assert(prezeroing_pos >= flush_pos);
+ uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
/*
* issue zero requests based on write_pos, even though the invariant
* is that we zero ahead of flush_pos.
return;
}
- assert(r == 0 || r == -ENOENT);
+ ceph_assert(r == 0 || r == -ENOENT);
if (start == prezero_pos) {
prezero_pos += len;
pending_zero.erase(b);
}
- if (waiting_for_zero) {
- _do_flush();
+ if (waiting_for_zero_pos > flush_pos) {
+ _do_flush(waiting_for_zero_pos - flush_pos);
+ }
+
+ if (prezero_pos == prezeroing_pos &&
+ !waitfor_prezero.empty()) {
+ list<Context*> ls;
+ ls.swap(waitfor_prezero);
+ finish_contexts(cct, ls, 0);
}
} else {
pending_zero.insert(start, len);
<< dendl;
}
+void Journaler::wait_for_prezero(Context *onfinish)
+{
+ ceph_assert(onfinish);
+ lock_guard l(lock);
+
+ if (prezero_pos == prezeroing_pos) {
+ finisher->queue(onfinish, 0);
+ return;
+ }
+ waitfor_prezero.push_back(wrap_finisher(onfinish));
+}
/***************** READING *******************/
<< p->second.length() << dendl;
received_pos += p->second.length();
read_buf.claim_append(p->second);
- assert(received_pos <= requested_pos);
+ ceph_assert(received_pos <= requested_pos);
prefetch_buf.erase(p);
got_any = true;
}
if (got_any) {
ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
- << read_buf.length() << ", read pointers " << read_pos
- << "/" << received_pos << "/" << requested_pos
+ << read_buf.length() << ", read pointers read_pos=" << read_pos
+ << " received_pos=" << received_pos << " requested_pos=" << requested_pos
<< dendl;
// Update readability (this will also hit any decode errors resulting
// from bad data)
- readable = _is_readable();
+ readable = _have_next_entry();
}
if ((got_any && !was_readable && readable) || read_pos == write_pos) {
{
// stuck at safe_pos? (this is needed if we are reading the tail of
// a journal we are also writing to)
- assert(requested_pos <= safe_pos);
+ ceph_assert(requested_pos <= safe_pos);
if (requested_pos == safe_pos) {
ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
<< ", waiting" << dendl;
- assert(write_pos > requested_pos);
+ ceph_assert(write_pos > requested_pos);
if (pending_safe.empty()) {
_flush(NULL);
}
// go.
ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
- << ", read pointers " << read_pos << "/" << received_pos
- << "/" << (requested_pos+len) << dendl;
+ << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
+ << " requested_pos+len=" << (requested_pos+len) << dendl;
// step by period (object). _don't_ do a single big filer.read()
// here because it will wait for all object reads to complete before
/*
- * _is_readable() - return true if next entry is ready.
+ * _have_next_entry() - return true if next entry is ready.
*/
-bool Journaler::_is_readable()
+bool Journaler::_have_next_entry()
{
// anything to read?
if (read_pos == write_pos)
return true;
}
- ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+ ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
<< ", but need " << need << " for next entry; fetch_len is "
<< fetch_len << dendl;
// partial fragment at the end?
if (received_pos == write_pos) {
- ldout(cct, 10) << "is_readable() detected partial entry at tail, "
+ ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
"adjusting write_pos to " << read_pos << dendl;
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
- assert(write_buf.length() == 0);
- assert(waitfor_safe.empty());
+ ceph_assert(write_buf.length() == 0);
+ ceph_assert(waitfor_safe.empty());
// reset read state
requested_pos = received_pos = read_pos;
if (need > fetch_len) {
temp_fetch_len = need;
- ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+ ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
<< dendl;
}
- ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
+ ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
return false;
}
bool Journaler::is_readable()
{
lock_guard l(lock);
+ return _is_readable();
+}
+bool Journaler::_is_readable()
+{
if (error != 0) {
return false;
}
try {
consumed = journal_stream.read(read_buf, &bl, &start_ptr);
if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
- assert(start_ptr == read_pos);
+ ceph_assert(start_ptr == read_pos);
}
} catch (const buffer::error &e) {
lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
read_pos += consumed;
try {
// We were readable, we might not be any more
- readable = _is_readable();
+ readable = _have_next_entry();
} catch (const buffer::error &e) {
- lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+ lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
error = -EINVAL;
return false;
}
// prefetch?
_prefetch();
+
+ // If bufferlist consists of discontiguous memory, decoding types whose
+ // denc_traits needs contiguous memory is inefficient. The bufferlist may
+ // get copied to temporary memory multiple times (copy_shallow() in
+ // src/include/denc.h actually does deep copy)
+ if (bl.get_num_buffers() > 1)
+ bl.rebuild();
return true;
}
void Journaler::wait_for_readable(Context *onreadable)
{
lock_guard l(lock);
+ _wait_for_readable(onreadable);
+}
+
+void Journaler::_wait_for_readable(Context *onreadable)
+{
if (is_stopping()) {
finisher->queue(onreadable, -EAGAIN);
return;
}
- assert(on_readable == 0);
+ ceph_assert(on_readable == 0);
if (!readable) {
ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
<< onreadable << dendl;
if (is_stopping())
return;
- assert(!readonly);
+ ceph_assert(!readonly);
uint64_t period = get_layout_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
}
// trim
- assert(trim_to <= write_pos);
- assert(trim_to <= expire_pos);
- assert(trim_to > trimming_pos);
+ ceph_assert(trim_to <= write_pos);
+ ceph_assert(trim_to <= expire_pos);
+ ceph_assert(trim_to > trimming_pos);
ldout(cct, 10) << "trim trimming to " << trim_to
<< ", trimmed/trimming/expire are "
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
{
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
<< ", trimmed/trimming/expire now "
<< to << "/" << trimming_pos << "/" << expire_pos
return;
}
- assert(r >= 0 || r == -ENOENT);
+ ceph_assert(r >= 0 || r == -ENOENT);
- assert(to <= trimming_pos);
- assert(to > trimmed_pos);
+ ceph_assert(to <= trimming_pos);
+ ceph_assert(to > trimmed_pos);
trimmed_pos = to;
}
lderr(cct) << __func__ << ": multiple write errors, handler already called"
<< dendl;
} else {
- assert(0 == "unhandled write error");
+ ceph_abort_msg("unhandled write error");
}
}
*/
bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
{
- assert(need != NULL);
+ ceph_assert(need != NULL);
uint32_t entry_size = 0;
uint64_t entry_sentinel = 0;
- bufferlist::iterator p = read_buf.begin();
+ auto p = read_buf.cbegin();
// Do we have enough data to decode an entry prefix?
if (format >= JOURNAL_FORMAT_RESILIENT) {
}
if (read_buf.length() >= *need) {
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::decode(entry_sentinel, p);
+ decode(entry_sentinel, p);
if (entry_sentinel != sentinel) {
throw buffer::malformed_input("Invalid sentinel");
}
}
- ::decode(entry_size, p);
+ decode(entry_size, p);
} else {
return false;
}
size_t JournalStream::read(bufferlist &from, bufferlist *entry,
uint64_t *start_ptr)
{
- assert(start_ptr != NULL);
- assert(entry != NULL);
- assert(entry->length() == 0);
+ ceph_assert(start_ptr != NULL);
+ ceph_assert(entry != NULL);
+ ceph_assert(entry->length() == 0);
uint32_t entry_size = 0;
// Consume envelope prefix: entry_size and entry_sentinel
- bufferlist::iterator from_ptr = from.begin();
+ auto from_ptr = from.cbegin();
if (format >= JOURNAL_FORMAT_RESILIENT) {
uint64_t entry_sentinel = 0;
- ::decode(entry_sentinel, from_ptr);
+ decode(entry_sentinel, from_ptr);
// Assertion instead of clean check because of precondition of this
// fn is that readable() already passed
- assert(entry_sentinel == sentinel);
+ ceph_assert(entry_sentinel == sentinel);
}
- ::decode(entry_size, from_ptr);
+ decode(entry_size, from_ptr);
// Read out the payload
from_ptr.copy(entry_size, *entry);
// Consume the envelope suffix (start_ptr)
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::decode(*start_ptr, from_ptr);
+ decode(*start_ptr, from_ptr);
} else {
*start_ptr = 0;
}
size_t JournalStream::write(bufferlist &entry, bufferlist *to,
uint64_t const &start_ptr)
{
- assert(to != NULL);
+ ceph_assert(to != NULL);
uint32_t const entry_size = entry.length();
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::encode(sentinel, *to);
+ encode(sentinel, *to);
}
- ::encode(entry_size, *to);
+ encode(entry_size, *to);
to->claim_append(entry);
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::encode(start_ptr, *to);
+ encode(start_ptr, *to);
}
if (format >= JOURNAL_FORMAT_RESILIENT) {
*/
void Journaler::set_write_error_handler(Context *c) {
lock_guard l(lock);
- assert(!on_write_error);
+ ceph_assert(!on_write_error);
on_write_error = wrap_finisher(c);
called_write_error = false;
}
waitfor_safe.clear();
}
+void Journaler::check_isreadable()
+{
+ std::unique_lock l(lock);
+ while (!_is_readable() &&
+ get_read_pos() < get_write_pos() &&
+ !get_error()) {
+ C_SaferCond readable_waiter;
+ _wait_for_readable(&readable_waiter);
+ l.unlock();
+ readable_waiter.wait();
+ l.lock();
+ }
+ return ;
+}