X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fosdc%2FJournaler.cc;h=04b90fb5952aaf2b430fa0cd97fa0bbc4dd49192;hb=39ae355f72b1d71f2212a99f2bd9f6c1e0d35528;hp=4217ab91ea1f6c86c18d0f7fbab232e2d18d07d0;hpb=b32b81446b3b05102be0267e79203f59329c1d97;p=ceph.git diff --git a/ceph/src/osdc/Journaler.cc b/ceph/src/osdc/Journaler.cc index 4217ab91e..04b90fb59 100644 --- a/ceph/src/osdc/Journaler.cc +++ b/ceph/src/osdc/Journaler.cc @@ -18,7 +18,7 @@ #include "msg/Messenger.h" #include "osdc/Journaler.h" #include "common/errno.h" -#include "include/assert.h" +#include "include/ceph_assert.h" #include "common/Finisher.h" #define dout_subsys ceph_subsys_journaler @@ -26,13 +26,14 @@ #define dout_prefix *_dout << objecter->messenger->get_myname() \ << ".journaler." << name << (readonly ? "(ro) ":"(rw) ") +using namespace std; using std::chrono::seconds; class Journaler::C_DelayFlush : public Context { Journaler *journaler; public: - C_DelayFlush(Journaler *j) : journaler(j) {} + explicit C_DelayFlush(Journaler *j) : journaler(j) {} void finish(int r) override { journaler->_do_delayed_flush(); } @@ -58,7 +59,7 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf) { lock_guard lk(lock); - assert(!readonly); + ceph_assert(!readonly); state = STATE_ACTIVE; stream_format = sf; @@ -94,9 +95,7 @@ void Journaler::_set_layout(file_layout_t const *l) // prefetch intelligently. // (watch out, this is big if you use big objects or weird striping) - uint64_t periods = cct->_conf->journaler_prefetch_periods; - if (periods < 2) - periods = 2; // we need at least 2 periods to make progress. + uint64_t periods = cct->_conf.get_val("journaler_prefetch_periods"); fetch_len = layout.get_period() * periods; } @@ -165,8 +164,8 @@ void Journaler::recover(Context *onread) } ldout(cct, 1) << "recover start" << dendl; - assert(state != STATE_ACTIVE); - assert(readonly); + ceph_assert(state != STATE_ACTIVE); + ceph_assert(readonly); if (onread) waitfor_recover.push_back(wrap_finisher(onread)); @@ -185,7 +184,7 @@ void Journaler::recover(Context *onread) void Journaler::_read_head(Context *on_finish, bufferlist *bl) { // lock is locked - assert(state == STATE_READHEAD || state == STATE_REREADHEAD); + ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD); object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); @@ -209,7 +208,7 @@ void Journaler::reread_head(Context *onfinish) void Journaler::_reread_head(Context *onfinish) { ldout(cct, 10) << "reread_head" << dendl; - assert(state == STATE_ACTIVE); + ceph_assert(state == STATE_ACTIVE); state = STATE_REREADHEAD; C_RereadHead *fin = new C_RereadHead(this, onfinish); @@ -225,14 +224,14 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) } //read on-disk header into - assert(bl.length() || r < 0 ); + ceph_assert(bl.length() || r < 0 ); // unpack header if (r == 0) { Header h; - bufferlist::iterator p = bl.begin(); + auto p = bl.cbegin(); try { - ::decode(h, p); + decode(h, p); } catch (const buffer::error &e) { finish->complete(-EINVAL); return; @@ -254,7 +253,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) if (is_stopping()) return; - assert(state == STATE_READHEAD); + ceph_assert(state == STATE_READHEAD); if (r!=0) { ldout(cct, 0) << "error getting journal off disk" << dendl; @@ -277,9 +276,9 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) // unpack header bool corrupt = false; Header h; - bufferlist::iterator p = bl.begin(); + auto p = bl.cbegin(); try { - ::decode(h, p); + decode(h, p); if (h.magic != magic) { ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '" @@ -322,7 +321,7 @@ void Journaler::_probe(Context *finish, uint64_t *end) { // lock is locked ldout(cct, 1) << "probing for end of the log" << dendl; - assert(state == STATE_PROBING || state == STATE_REPROBING); + ceph_assert(state == STATE_PROBING || state == STATE_REPROBING); // probe the log filer.probe(ino, &layout, CEPH_NOSNAP, write_pos, end, true, 0, wrap_finisher(finish)); @@ -331,7 +330,7 @@ void Journaler::_probe(Context *finish, uint64_t *end) void Journaler::_reprobe(C_OnFinisher *finish) { ldout(cct, 10) << "reprobe" << dendl; - assert(state == STATE_ACTIVE); + ceph_assert(state == STATE_ACTIVE); state = STATE_REPROBING; C_ReProbe *fin = new C_ReProbe(this, finish); @@ -348,7 +347,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, return; } - assert(new_end >= write_pos || r < 0); + ceph_assert(new_end >= write_pos || r < 0); ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; @@ -363,7 +362,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end) if (is_stopping()) return; - assert(state == STATE_PROBING); + ceph_assert(state == STATE_PROBING); if (r < 0) { // error in probing goto out; } @@ -373,7 +372,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end) << write_pos << "). log was empty. recovered." << dendl; ceph_abort(); // hrm. } else { - assert(end >= write_pos); + ceph_assert(end >= write_pos); ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had " << write_pos << "). recovered." << dendl; @@ -406,7 +405,7 @@ void Journaler::reread_head_and_probe(Context *onfinish) { lock_guard l(lock); - assert(state == STATE_ACTIVE); + ceph_assert(state == STATE_ACTIVE); _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish))); } @@ -419,7 +418,14 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) return; } - assert(!r); //if we get an error, we're boned + // Let the caller know that the operation has failed or was intentionally + // failed since the caller has been blocklisted. + if (r == -EBLOCKLISTED) { + onfinish->complete(r); + return; + } + + ceph_assert(!r); //if we get an error, we're boned _reprobe(onfinish); } @@ -447,8 +453,8 @@ void Journaler::write_head(Context *oncommit) void Journaler::_write_head(Context *oncommit) { - assert(!readonly); - assert(state == STATE_ACTIVE); + ceph_assert(!readonly); + ceph_assert(state == STATE_ACTIVE); last_written.trimmed_pos = trimmed_pos; last_written.expire_pos = expire_pos; last_written.unused_field = expire_pos; @@ -457,13 +463,13 @@ void Journaler::_write_head(Context *oncommit) ldout(cct, 10) << "write_head " << last_written << dendl; // Avoid persisting bad pointers in case of bugs - assert(last_written.write_pos >= last_written.expire_pos); - assert(last_written.expire_pos >= last_written.trimmed_pos); + ceph_assert(last_written.write_pos >= last_written.expire_pos); + ceph_assert(last_written.expire_pos >= last_written.trimmed_pos); last_wrote_head = ceph::real_clock::now(); bufferlist bl; - ::encode(last_written, bl); + encode(last_written, bl); SnapContext snapc; object_t oid = file_object_t(ino, 0); @@ -485,7 +491,7 @@ void Journaler::_finish_write_head(int r, Header &wrote, handle_write_error(r); return; } - assert(!readonly); + ceph_assert(!readonly); ldout(cct, 10) << "_finish_write_head " << wrote << dendl; last_committed = wrote; if (oncommit) { @@ -513,7 +519,7 @@ public: void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) { lock_guard l(lock); - assert(!readonly); + ceph_assert(!readonly); if (r < 0) { lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl; @@ -521,7 +527,7 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) return; } - assert(start < flush_pos); + ceph_assert(start < flush_pos); // calc latency? if (logger) { @@ -531,12 +537,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) // adjust safe_pos auto it = pending_safe.find(start); - assert(it != pending_safe.end()); + ceph_assert(it != pending_safe.end()); + uint64_t min_next_safe_pos = pending_safe.begin()->second; pending_safe.erase(it); if (pending_safe.empty()) safe_pos = next_safe_pos; else - safe_pos = pending_safe.begin()->second; + safe_pos = min_next_safe_pos; ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe @@ -565,7 +572,7 @@ uint64_t Journaler::append_entry(bufferlist& bl) { unique_lock l(lock); - assert(!readonly); + ceph_assert(!readonly); uint32_t s = bl.length(); // append @@ -585,7 +592,7 @@ uint64_t Journaler::append_entry(bufferlist& bl) // flush previous object? uint64_t su = get_layout_period(); - assert(su > 0); + ceph_assert(su > 0); uint64_t write_off = write_pos % su; uint64_t write_obj = write_pos / su; uint64_t flush_obj = flush_pos / su; @@ -613,12 +620,12 @@ void Journaler::_do_flush(unsigned amount) return; if (write_pos == flush_pos) return; - assert(write_pos > flush_pos); - assert(!readonly); + ceph_assert(write_pos > flush_pos); + ceph_assert(!readonly); // flush uint64_t len = write_pos - flush_pos; - assert(len == write_buf.length()); + ceph_assert(len == write_buf.length()); if (amount && amount < len) len = amount; @@ -633,19 +640,16 @@ void Journaler::_do_flush(unsigned amount) ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " already too close to prezero_pos " << prezero_pos << ", zeroing first" << dendl; - waiting_for_zero = true; + waiting_for_zero_pos = flush_pos + len; return; } if (static_cast(newlen) < len) { ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos << ", will do " << flush_pos << "~" << newlen << dendl; + waiting_for_zero_pos = flush_pos + len; len = newlen; - } else { - waiting_for_zero = false; } - } else { - waiting_for_zero = false; } ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl; @@ -684,7 +688,7 @@ void Journaler::_do_flush(unsigned amount) wrap_finisher(onsafe), write_iohint); flush_pos += len; - assert(write_buf.length() == write_pos - flush_pos); + ceph_assert(write_buf.length() == write_pos - flush_pos); write_buf_throttle.put(len); ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl; @@ -701,7 +705,8 @@ void Journaler::wait_for_flush(Context *onsafe) { lock_guard l(lock); if (is_stopping()) { - onsafe->complete(-EAGAIN); + if (onsafe) + onsafe->complete(-EAGAIN); return; } _wait_for_flush(onsafe); @@ -709,11 +714,11 @@ void Journaler::wait_for_flush(Context *onsafe) void Journaler::_wait_for_flush(Context *onsafe) { - assert(!readonly); + ceph_assert(!readonly); // all flushed and safe? if (write_pos == safe_pos) { - assert(write_buf.length() == 0); + ceph_assert(write_buf.length() == 0); ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe " "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" @@ -734,7 +739,8 @@ void Journaler::flush(Context *onsafe) { lock_guard l(lock); if (is_stopping()) { - onsafe->complete(-EAGAIN); + if (onsafe) + onsafe->complete(-EAGAIN); return; } _flush(wrap_finisher(onsafe)); @@ -742,10 +748,10 @@ void Journaler::flush(Context *onsafe) void Journaler::_flush(C_OnFinisher *onsafe) { - assert(!readonly); + ceph_assert(!readonly); if (write_pos == flush_pos) { - assert(write_buf.length() == 0); + ceph_assert(write_buf.length() == 0); ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/" "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos @@ -766,7 +772,7 @@ void Journaler::_flush(C_OnFinisher *onsafe) bool Journaler::_write_head_needed() { - return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval) + return last_wrote_head + seconds(cct->_conf.get_val("journaler_write_head_interval")) < ceph::real_clock::now(); } @@ -785,12 +791,9 @@ struct C_Journaler_Prezero : public Context { void Journaler::_issue_prezero() { - assert(prezeroing_pos >= flush_pos); - - // we need to zero at least two periods, minimum, to ensure that we - // have a full empty object/period in front of us. - uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods); + ceph_assert(prezeroing_pos >= flush_pos); + uint64_t num_periods = cct->_conf.get_val("journaler_prezero_periods"); /* * issue zero requests based on write_pos, even though the invariant * is that we zero ahead of flush_pos. @@ -842,7 +845,7 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) return; } - assert(r == 0 || r == -ENOENT); + ceph_assert(r == 0 || r == -ENOENT); if (start == prezero_pos) { prezero_pos += len; @@ -853,8 +856,15 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) pending_zero.erase(b); } - if (waiting_for_zero) { - _do_flush(); + if (waiting_for_zero_pos > flush_pos) { + _do_flush(waiting_for_zero_pos - flush_pos); + } + + if (prezero_pos == prezeroing_pos && + !waitfor_prezero.empty()) { + list ls; + ls.swap(waitfor_prezero); + finish_contexts(cct, ls, 0); } } else { pending_zero.insert(start, len); @@ -865,6 +875,17 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) << dendl; } +void Journaler::wait_for_prezero(Context *onfinish) +{ + ceph_assert(onfinish); + lock_guard l(lock); + + if (prezero_pos == prezeroing_pos) { + finisher->queue(onfinish, 0); + return; + } + waitfor_prezero.push_back(wrap_finisher(onfinish)); +} /***************** READING *******************/ @@ -957,20 +978,20 @@ void Journaler::_assimilate_prefetch() << p->second.length() << dendl; received_pos += p->second.length(); read_buf.claim_append(p->second); - assert(received_pos <= requested_pos); + ceph_assert(received_pos <= requested_pos); prefetch_buf.erase(p); got_any = true; } if (got_any) { ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" - << read_buf.length() << ", read pointers " << read_pos - << "/" << received_pos << "/" << requested_pos + << read_buf.length() << ", read pointers read_pos=" << read_pos + << " received_pos=" << received_pos << " requested_pos=" << requested_pos << dendl; // Update readability (this will also hit any decode errors resulting // from bad data) - readable = _is_readable(); + readable = _have_next_entry(); } if ((got_any && !was_readable && readable) || read_pos == write_pos) { @@ -990,11 +1011,11 @@ void Journaler::_issue_read(uint64_t len) { // stuck at safe_pos? (this is needed if we are reading the tail of // a journal we are also writing to) - assert(requested_pos <= safe_pos); + ceph_assert(requested_pos <= safe_pos); if (requested_pos == safe_pos) { ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; - assert(write_pos > requested_pos); + ceph_assert(write_pos > requested_pos); if (pending_safe.empty()) { _flush(NULL); } @@ -1020,8 +1041,8 @@ void Journaler::_issue_read(uint64_t len) // go. ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len - << ", read pointers " << read_pos << "/" << received_pos - << "/" << (requested_pos+len) << dendl; + << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos + << " requested_pos+len=" << (requested_pos+len) << dendl; // step by period (object). _don't_ do a single big filer.read() // here because it will wait for all object reads to complete before @@ -1094,9 +1115,9 @@ void Journaler::_prefetch() /* - * _is_readable() - return true if next entry is ready. + * _have_next_entry() - return true if next entry is ready. */ -bool Journaler::_is_readable() +bool Journaler::_have_next_entry() { // anything to read? if (read_pos == write_pos) @@ -1108,19 +1129,19 @@ bool Journaler::_is_readable() return true; } - ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() + ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length() << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl; // partial fragment at the end? if (received_pos == write_pos) { - ldout(cct, 10) << "is_readable() detected partial entry at tail, " + ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, " "adjusting write_pos to " << read_pos << dendl; // adjust write_pos prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; - assert(write_buf.length() == 0); - assert(waitfor_safe.empty()); + ceph_assert(write_buf.length() == 0); + ceph_assert(waitfor_safe.empty()); // reset read state requested_pos = received_pos = read_pos; @@ -1133,11 +1154,11 @@ bool Journaler::_is_readable() if (need > fetch_len) { temp_fetch_len = need; - ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len + ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len << dendl; } - ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; + ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl; return false; } @@ -1147,7 +1168,11 @@ bool Journaler::_is_readable() bool Journaler::is_readable() { lock_guard l(lock); + return _is_readable(); +} +bool Journaler::_is_readable() +{ if (error != 0) { return false; } @@ -1228,7 +1253,7 @@ bool Journaler::try_read_entry(bufferlist& bl) try { consumed = journal_stream.read(read_buf, &bl, &start_ptr); if (stream_format >= JOURNAL_FORMAT_RESILIENT) { - assert(start_ptr == read_pos); + ceph_assert(start_ptr == read_pos); } } catch (const buffer::error &e) { lderr(cct) << __func__ << ": decode error from journal_stream" << dendl; @@ -1243,27 +1268,39 @@ bool Journaler::try_read_entry(bufferlist& bl) read_pos += consumed; try { // We were readable, we might not be any more - readable = _is_readable(); + readable = _have_next_entry(); } catch (const buffer::error &e) { - lderr(cct) << __func__ << ": decode error from _is_readable" << dendl; + lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl; error = -EINVAL; return false; } // prefetch? _prefetch(); + + // If bufferlist consists of discontiguous memory, decoding types whose + // denc_traits needs contiguous memory is inefficient. The bufferlist may + // get copied to temporary memory multiple times (copy_shallow() in + // src/include/denc.h actually does deep copy) + if (bl.get_num_buffers() > 1) + bl.rebuild(); return true; } void Journaler::wait_for_readable(Context *onreadable) { lock_guard l(lock); + _wait_for_readable(onreadable); +} + +void Journaler::_wait_for_readable(Context *onreadable) +{ if (is_stopping()) { finisher->queue(onreadable, -EAGAIN); return; } - assert(on_readable == 0); + ceph_assert(on_readable == 0); if (!readable) { ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl; @@ -1306,7 +1343,7 @@ void Journaler::_trim() if (is_stopping()) return; - assert(!readonly); + ceph_assert(!readonly); uint64_t period = get_layout_period(); uint64_t trim_to = last_committed.expire_pos; trim_to -= trim_to % period; @@ -1326,9 +1363,9 @@ void Journaler::_trim() } // trim - assert(trim_to <= write_pos); - assert(trim_to <= expire_pos); - assert(trim_to > trimming_pos); + ceph_assert(trim_to <= write_pos); + ceph_assert(trim_to <= expire_pos); + ceph_assert(trim_to > trimming_pos); ldout(cct, 10) << "trim trimming to " << trim_to << ", trimmed/trimming/expire are " << trimmed_pos << "/" << trimming_pos << "/" << expire_pos @@ -1348,7 +1385,7 @@ void Journaler::_finish_trim(int r, uint64_t to) { lock_guard l(lock); - assert(!readonly); + ceph_assert(!readonly); ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos << ", trimmed/trimming/expire now " << to << "/" << trimming_pos << "/" << expire_pos @@ -1359,10 +1396,10 @@ void Journaler::_finish_trim(int r, uint64_t to) return; } - assert(r >= 0 || r == -ENOENT); + ceph_assert(r >= 0 || r == -ENOENT); - assert(to <= trimming_pos); - assert(to > trimmed_pos); + ceph_assert(to <= trimming_pos); + ceph_assert(to > trimmed_pos); trimmed_pos = to; } @@ -1382,7 +1419,7 @@ void Journaler::handle_write_error(int r) lderr(cct) << __func__ << ": multiple write errors, handler already called" << dendl; } else { - assert(0 == "unhandled write error"); + ceph_abort_msg("unhandled write error"); } } @@ -1397,11 +1434,11 @@ void Journaler::handle_write_error(int r) */ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const { - assert(need != NULL); + ceph_assert(need != NULL); uint32_t entry_size = 0; uint64_t entry_sentinel = 0; - bufferlist::iterator p = read_buf.begin(); + auto p = read_buf.cbegin(); // Do we have enough data to decode an entry prefix? if (format >= JOURNAL_FORMAT_RESILIENT) { @@ -1411,13 +1448,13 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const } if (read_buf.length() >= *need) { if (format >= JOURNAL_FORMAT_RESILIENT) { - ::decode(entry_sentinel, p); + decode(entry_sentinel, p); if (entry_sentinel != sentinel) { throw buffer::malformed_input("Invalid sentinel"); } } - ::decode(entry_size, p); + decode(entry_size, p); } else { return false; } @@ -1454,29 +1491,29 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const size_t JournalStream::read(bufferlist &from, bufferlist *entry, uint64_t *start_ptr) { - assert(start_ptr != NULL); - assert(entry != NULL); - assert(entry->length() == 0); + ceph_assert(start_ptr != NULL); + ceph_assert(entry != NULL); + ceph_assert(entry->length() == 0); uint32_t entry_size = 0; // Consume envelope prefix: entry_size and entry_sentinel - bufferlist::iterator from_ptr = from.begin(); + auto from_ptr = from.cbegin(); if (format >= JOURNAL_FORMAT_RESILIENT) { uint64_t entry_sentinel = 0; - ::decode(entry_sentinel, from_ptr); + decode(entry_sentinel, from_ptr); // Assertion instead of clean check because of precondition of this // fn is that readable() already passed - assert(entry_sentinel == sentinel); + ceph_assert(entry_sentinel == sentinel); } - ::decode(entry_size, from_ptr); + decode(entry_size, from_ptr); // Read out the payload from_ptr.copy(entry_size, *entry); // Consume the envelope suffix (start_ptr) if (format >= JOURNAL_FORMAT_RESILIENT) { - ::decode(*start_ptr, from_ptr); + decode(*start_ptr, from_ptr); } else { *start_ptr = 0; } @@ -1494,16 +1531,16 @@ size_t JournalStream::read(bufferlist &from, bufferlist *entry, size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr) { - assert(to != NULL); + ceph_assert(to != NULL); uint32_t const entry_size = entry.length(); if (format >= JOURNAL_FORMAT_RESILIENT) { - ::encode(sentinel, *to); + encode(sentinel, *to); } - ::encode(entry_size, *to); + encode(entry_size, *to); to->claim_append(entry); if (format >= JOURNAL_FORMAT_RESILIENT) { - ::encode(start_ptr, *to); + encode(start_ptr, *to); } if (format >= JOURNAL_FORMAT_RESILIENT) { @@ -1529,7 +1566,7 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to, */ void Journaler::set_write_error_handler(Context *c) { lock_guard l(lock); - assert(!on_write_error); + ceph_assert(!on_write_error); on_write_error = wrap_finisher(c); called_write_error = false; } @@ -1578,3 +1615,17 @@ void Journaler::shutdown() waitfor_safe.clear(); } +void Journaler::check_isreadable() +{ + std::unique_lock l(lock); + while (!_is_readable() && + get_read_pos() < get_write_pos() && + !get_error()) { + C_SaferCond readable_waiter; + _wait_for_readable(&readable_waiter); + l.unlock(); + readable_waiter.wait(); + l.lock(); + } + return ; +}