]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Journaler.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / osdc / Journaler.cc
index 4217ab91ea1f6c86c18d0f7fbab232e2d18d07d0..04b90fb5952aaf2b430fa0cd97fa0bbc4dd49192 100644 (file)
@@ -18,7 +18,7 @@
 #include "msg/Messenger.h"
 #include "osdc/Journaler.h"
 #include "common/errno.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "common/Finisher.h"
 
 #define dout_subsys ceph_subsys_journaler
 #define dout_prefix *_dout << objecter->messenger->get_myname() \
   << ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
 
+using namespace std;
 using std::chrono::seconds;
 
 
 class Journaler::C_DelayFlush : public Context {
   Journaler *journaler;
   public:
-  C_DelayFlush(Journaler *j) : journaler(j) {}
+  explicit C_DelayFlush(Journaler *j) : journaler(j) {}
   void finish(int r) override {
     journaler->_do_delayed_flush();
   }
@@ -58,7 +59,7 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf)
 {
   lock_guard lk(lock);
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   state = STATE_ACTIVE;
 
   stream_format = sf;
@@ -94,9 +95,7 @@ void Journaler::_set_layout(file_layout_t const *l)
 
   // prefetch intelligently.
   // (watch out, this is big if you use big objects or weird striping)
-  uint64_t periods = cct->_conf->journaler_prefetch_periods;
-  if (periods < 2)
-    periods = 2;  // we need at least 2 periods to make progress.
+  uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
   fetch_len = layout.get_period() * periods;
 }
 
@@ -165,8 +164,8 @@ void Journaler::recover(Context *onread)
   }
 
   ldout(cct, 1) << "recover start" << dendl;
-  assert(state != STATE_ACTIVE);
-  assert(readonly);
+  ceph_assert(state != STATE_ACTIVE);
+  ceph_assert(readonly);
 
   if (onread)
     waitfor_recover.push_back(wrap_finisher(onread));
@@ -185,7 +184,7 @@ void Journaler::recover(Context *onread)
 void Journaler::_read_head(Context *on_finish, bufferlist *bl)
 {
   // lock is locked
-  assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
+  ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
 
   object_t oid = file_object_t(ino, 0);
   object_locator_t oloc(pg_pool);
@@ -209,7 +208,7 @@ void Journaler::reread_head(Context *onfinish)
 void Journaler::_reread_head(Context *onfinish)
 {
   ldout(cct, 10) << "reread_head" << dendl;
-  assert(state == STATE_ACTIVE);
+  ceph_assert(state == STATE_ACTIVE);
 
   state = STATE_REREADHEAD;
   C_RereadHead *fin = new C_RereadHead(this, onfinish);
@@ -225,14 +224,14 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
   }
 
   //read on-disk header into
-  assert(bl.length() || r < 0 );
+  ceph_assert(bl.length() || r < 0 );
 
   // unpack header
   if (r == 0) {
     Header h;
-    bufferlist::iterator p = bl.begin();
+    auto p = bl.cbegin();
     try {
-      ::decode(h, p);
+      decode(h, p);
     } catch (const buffer::error &e) {
       finish->complete(-EINVAL);
       return;
@@ -254,7 +253,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   if (is_stopping())
     return;
 
-  assert(state == STATE_READHEAD);
+  ceph_assert(state == STATE_READHEAD);
 
   if (r!=0) {
     ldout(cct, 0) << "error getting journal off disk" << dendl;
@@ -277,9 +276,9 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   // unpack header
   bool corrupt = false;
   Header h;
-  bufferlist::iterator p = bl.begin();
+  auto p = bl.cbegin();
   try {
-    ::decode(h, p);
+    decode(h, p);
 
     if (h.magic != magic) {
       ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
@@ -322,7 +321,7 @@ void Journaler::_probe(Context *finish, uint64_t *end)
 {
   // lock is locked
   ldout(cct, 1) << "probing for end of the log" << dendl;
-  assert(state == STATE_PROBING || state == STATE_REPROBING);
+  ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
   // probe the log
   filer.probe(ino, &layout, CEPH_NOSNAP,
              write_pos, end, true, 0, wrap_finisher(finish));
@@ -331,7 +330,7 @@ void Journaler::_probe(Context *finish, uint64_t *end)
 void Journaler::_reprobe(C_OnFinisher *finish)
 {
   ldout(cct, 10) << "reprobe" << dendl;
-  assert(state == STATE_ACTIVE);
+  ceph_assert(state == STATE_ACTIVE);
 
   state = STATE_REPROBING;
   C_ReProbe *fin = new C_ReProbe(this, finish);
@@ -348,7 +347,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
     return;
   }
 
-  assert(new_end >= write_pos || r < 0);
+  ceph_assert(new_end >= write_pos || r < 0);
   ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
          << " (header had " << write_pos << ")."
          << dendl;
@@ -363,7 +362,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
   if (is_stopping())
     return;
 
-  assert(state == STATE_PROBING);
+  ceph_assert(state == STATE_PROBING);
   if (r < 0) { // error in probing
     goto out;
   }
@@ -373,7 +372,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
                  << write_pos << "). log was empty. recovered." << dendl;
     ceph_abort(); // hrm.
   } else {
-    assert(end >= write_pos);
+    ceph_assert(end >= write_pos);
     ldout(cct, 1) << "_finish_probe_end write_pos = " << end
                  << " (header had " << write_pos << "). recovered."
                  << dendl;
@@ -406,7 +405,7 @@ void Journaler::reread_head_and_probe(Context *onfinish)
 {
   lock_guard l(lock);
 
-  assert(state == STATE_ACTIVE);
+  ceph_assert(state == STATE_ACTIVE);
   _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
 }
 
@@ -419,7 +418,14 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
     return;
   }
 
-  assert(!r); //if we get an error, we're boned
+  // Let the caller know that the operation has failed or was intentionally
+  // failed since the caller has been blocklisted.
+  if (r == -EBLOCKLISTED) {
+    onfinish->complete(r);
+    return;
+  }
+
+  ceph_assert(!r); //if we get an error, we're boned
   _reprobe(onfinish);
 }
 
@@ -447,8 +453,8 @@ void Journaler::write_head(Context *oncommit)
 
 void Journaler::_write_head(Context *oncommit)
 {
-  assert(!readonly);
-  assert(state == STATE_ACTIVE);
+  ceph_assert(!readonly);
+  ceph_assert(state == STATE_ACTIVE);
   last_written.trimmed_pos = trimmed_pos;
   last_written.expire_pos = expire_pos;
   last_written.unused_field = expire_pos;
@@ -457,13 +463,13 @@ void Journaler::_write_head(Context *oncommit)
   ldout(cct, 10) << "write_head " << last_written << dendl;
 
   // Avoid persisting bad pointers in case of bugs
-  assert(last_written.write_pos >= last_written.expire_pos);
-  assert(last_written.expire_pos >= last_written.trimmed_pos);
+  ceph_assert(last_written.write_pos >= last_written.expire_pos);
+  ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
 
   last_wrote_head = ceph::real_clock::now();
 
   bufferlist bl;
-  ::encode(last_written, bl);
+  encode(last_written, bl);
   SnapContext snapc;
 
   object_t oid = file_object_t(ino, 0);
@@ -485,7 +491,7 @@ void Journaler::_finish_write_head(int r, Header &wrote,
     handle_write_error(r);
     return;
   }
-  assert(!readonly);
+  ceph_assert(!readonly);
   ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
   last_committed = wrote;
   if (oncommit) {
@@ -513,7 +519,7 @@ public:
 void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
 {
   lock_guard l(lock);
-  assert(!readonly);
+  ceph_assert(!readonly);
 
   if (r < 0) {
     lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
@@ -521,7 +527,7 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
     return;
   }
 
-  assert(start < flush_pos);
+  ceph_assert(start < flush_pos);
 
   // calc latency?
   if (logger) {
@@ -531,12 +537,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
 
   // adjust safe_pos
   auto it = pending_safe.find(start);
-  assert(it != pending_safe.end());
+  ceph_assert(it != pending_safe.end());
+  uint64_t min_next_safe_pos = pending_safe.begin()->second;
   pending_safe.erase(it);
   if (pending_safe.empty())
     safe_pos = next_safe_pos;
   else
-    safe_pos = pending_safe.begin()->second;
+    safe_pos = min_next_safe_pos;
 
   ldout(cct, 10) << "_finish_flush safe from " << start
                 << ", pending_safe " << pending_safe
@@ -565,7 +572,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 {
   unique_lock l(lock);
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   uint32_t s = bl.length();
 
   // append
@@ -585,7 +592,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 
   // flush previous object?
   uint64_t su = get_layout_period();
-  assert(su > 0);
+  ceph_assert(su > 0);
   uint64_t write_off = write_pos % su;
   uint64_t write_obj = write_pos / su;
   uint64_t flush_obj = flush_pos / su;
@@ -613,12 +620,12 @@ void Journaler::_do_flush(unsigned amount)
     return;
   if (write_pos == flush_pos)
     return;
-  assert(write_pos > flush_pos);
-  assert(!readonly);
+  ceph_assert(write_pos > flush_pos);
+  ceph_assert(!readonly);
 
   // flush
   uint64_t len = write_pos - flush_pos;
-  assert(len == write_buf.length());
+  ceph_assert(len == write_buf.length());
   if (amount && amount < len)
     len = amount;
 
@@ -633,19 +640,16 @@ void Journaler::_do_flush(unsigned amount)
       ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
                     << " already too close to prezero_pos " << prezero_pos
                     << ", zeroing first" << dendl;
-      waiting_for_zero = true;
+      waiting_for_zero_pos = flush_pos + len;
       return;
     }
     if (static_cast<uint64_t>(newlen) < len) {
       ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
                     << " but hit prezero_pos " << prezero_pos
                     << ", will do " << flush_pos << "~" << newlen << dendl;
+      waiting_for_zero_pos = flush_pos + len;
       len = newlen;
-    } else {
-      waiting_for_zero = false;
     }
-  } else {
-    waiting_for_zero = false;
   }
   ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
 
@@ -684,7 +688,7 @@ void Journaler::_do_flush(unsigned amount)
              wrap_finisher(onsafe), write_iohint);
 
   flush_pos += len;
-  assert(write_buf.length() == write_pos - flush_pos);
+  ceph_assert(write_buf.length() == write_pos - flush_pos);
   write_buf_throttle.put(len);
   ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
  
@@ -701,7 +705,8 @@ void Journaler::wait_for_flush(Context *onsafe)
 {
   lock_guard l(lock);
   if (is_stopping()) {
-    onsafe->complete(-EAGAIN);
+    if (onsafe)
+      onsafe->complete(-EAGAIN);
     return;
   }
   _wait_for_flush(onsafe);
@@ -709,11 +714,11 @@ void Journaler::wait_for_flush(Context *onsafe)
 
 void Journaler::_wait_for_flush(Context *onsafe)
 {
-  assert(!readonly);
+  ceph_assert(!readonly);
 
   // all flushed and safe?
   if (write_pos == safe_pos) {
-    assert(write_buf.length() == 0);
+    ceph_assert(write_buf.length() == 0);
     ldout(cct, 10)
       << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
       "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
@@ -734,7 +739,8 @@ void Journaler::flush(Context *onsafe)
 {
   lock_guard l(lock);
   if (is_stopping()) {
-    onsafe->complete(-EAGAIN);
+    if (onsafe)
+      onsafe->complete(-EAGAIN);
     return;
   }
   _flush(wrap_finisher(onsafe));
@@ -742,10 +748,10 @@ void Journaler::flush(Context *onsafe)
 
 void Journaler::_flush(C_OnFinisher *onsafe)
 {
-  assert(!readonly);
+  ceph_assert(!readonly);
 
   if (write_pos == flush_pos) {
-    assert(write_buf.length() == 0);
+    ceph_assert(write_buf.length() == 0);
     ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
       "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
                   << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
@@ -766,7 +772,7 @@ void Journaler::_flush(C_OnFinisher *onsafe)
 
 bool Journaler::_write_head_needed()
 {
-  return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval)
+  return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
       < ceph::real_clock::now();
 }
 
@@ -785,12 +791,9 @@ struct C_Journaler_Prezero : public Context {
 
 void Journaler::_issue_prezero()
 {
-  assert(prezeroing_pos >= flush_pos);
-
-  // we need to zero at least two periods, minimum, to ensure that we
-  // have a full empty object/period in front of us.
-  uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
+  ceph_assert(prezeroing_pos >= flush_pos);
 
+  uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
   /*
    * issue zero requests based on write_pos, even though the invariant
    * is that we zero ahead of flush_pos.
@@ -842,7 +845,7 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
     return;
   }
 
-  assert(r == 0 || r == -ENOENT);
+  ceph_assert(r == 0 || r == -ENOENT);
 
   if (start == prezero_pos) {
     prezero_pos += len;
@@ -853,8 +856,15 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
       pending_zero.erase(b);
     }
 
-    if (waiting_for_zero) {
-      _do_flush();
+    if (waiting_for_zero_pos > flush_pos) {
+      _do_flush(waiting_for_zero_pos - flush_pos);
+    }
+
+    if (prezero_pos == prezeroing_pos &&
+       !waitfor_prezero.empty()) {
+      list<Context*> ls;
+      ls.swap(waitfor_prezero);
+      finish_contexts(cct, ls, 0);
     }
   } else {
     pending_zero.insert(start, len);
@@ -865,6 +875,17 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
                 << dendl;
 }
 
+void Journaler::wait_for_prezero(Context *onfinish)
+{
+  ceph_assert(onfinish);
+  lock_guard l(lock);
+
+  if (prezero_pos == prezeroing_pos) {
+    finisher->queue(onfinish, 0);
+    return;
+  }
+  waitfor_prezero.push_back(wrap_finisher(onfinish));
+}
 
 
 /***************** READING *******************/
@@ -957,20 +978,20 @@ void Journaler::_assimilate_prefetch()
                   << p->second.length() << dendl;
     received_pos += p->second.length();
     read_buf.claim_append(p->second);
-    assert(received_pos <= requested_pos);
+    ceph_assert(received_pos <= requested_pos);
     prefetch_buf.erase(p);
     got_any = true;
   }
 
   if (got_any) {
     ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
-                  << read_buf.length() << ", read pointers " << read_pos
-                  << "/" << received_pos << "/" << requested_pos
+                  << read_buf.length() << ", read pointers read_pos=" << read_pos 
+                   << " received_pos=" << received_pos << " requested_pos=" << requested_pos
                   << dendl;
 
     // Update readability (this will also hit any decode errors resulting
     // from bad data)
-    readable = _is_readable();
+    readable = _have_next_entry();
   }
 
   if ((got_any && !was_readable && readable) || read_pos == write_pos) {
@@ -990,11 +1011,11 @@ void Journaler::_issue_read(uint64_t len)
 {
   // stuck at safe_pos?  (this is needed if we are reading the tail of
   // a journal we are also writing to)
-  assert(requested_pos <= safe_pos);
+  ceph_assert(requested_pos <= safe_pos);
   if (requested_pos == safe_pos) {
     ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
                   << ", waiting" << dendl;
-    assert(write_pos > requested_pos);
+    ceph_assert(write_pos > requested_pos);
     if (pending_safe.empty()) {
       _flush(NULL);
     }
@@ -1020,8 +1041,8 @@ void Journaler::_issue_read(uint64_t len)
 
   // go.
   ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
-                << ", read pointers " << read_pos << "/" << received_pos
-                << "/" << (requested_pos+len) << dendl;
+                << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
+                << " requested_pos+len=" << (requested_pos+len) << dendl;
 
   // step by period (object).  _don't_ do a single big filer.read()
   // here because it will wait for all object reads to complete before
@@ -1094,9 +1115,9 @@ void Journaler::_prefetch()
 
 
 /*
- * _is_readable() - return true if next entry is ready.
+ * _have_next_entry() - return true if next entry is ready.
  */
-bool Journaler::_is_readable()
+bool Journaler::_have_next_entry()
 {
   // anything to read?
   if (read_pos == write_pos)
@@ -1108,19 +1129,19 @@ bool Journaler::_is_readable()
     return true;
   }
 
-  ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
+  ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
                  << ", but need " << need << " for next entry; fetch_len is "
                  << fetch_len << dendl;
 
   // partial fragment at the end?
   if (received_pos == write_pos) {
-    ldout(cct, 10) << "is_readable() detected partial entry at tail, "
+    ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
       "adjusting write_pos to " << read_pos << dendl;
 
     // adjust write_pos
     prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
-    assert(write_buf.length() == 0);
-    assert(waitfor_safe.empty());
+    ceph_assert(write_buf.length() == 0);
+    ceph_assert(waitfor_safe.empty());
 
     // reset read state
     requested_pos = received_pos = read_pos;
@@ -1133,11 +1154,11 @@ bool Journaler::_is_readable()
 
   if (need > fetch_len) {
     temp_fetch_len = need;
-    ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+    ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
                   << dendl;
   }
 
-  ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
+  ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
   return false;
 }
 
@@ -1147,7 +1168,11 @@ bool Journaler::_is_readable()
 bool Journaler::is_readable()
 {
   lock_guard l(lock);
+  return _is_readable();
+}
 
+bool Journaler::_is_readable()
+{
   if (error != 0) {
     return false;
   }
@@ -1228,7 +1253,7 @@ bool Journaler::try_read_entry(bufferlist& bl)
   try {
     consumed = journal_stream.read(read_buf, &bl, &start_ptr);
     if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
-      assert(start_ptr == read_pos);
+      ceph_assert(start_ptr == read_pos);
     }
   } catch (const buffer::error &e) {
     lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
@@ -1243,27 +1268,39 @@ bool Journaler::try_read_entry(bufferlist& bl)
   read_pos += consumed;
   try {
     // We were readable, we might not be any more
-    readable = _is_readable();
+    readable = _have_next_entry();
   } catch (const buffer::error &e) {
-    lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
+    lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
     error = -EINVAL;
     return false;
   }
 
   // prefetch?
   _prefetch();
+
+  // If bufferlist consists of discontiguous memory, decoding types whose
+  // denc_traits needs contiguous memory is inefficient. The bufferlist may
+  // get copied to temporary memory multiple times (copy_shallow() in
+  // src/include/denc.h actually does deep copy)
+  if (bl.get_num_buffers() > 1)
+    bl.rebuild();
   return true;
 }
 
 void Journaler::wait_for_readable(Context *onreadable)
 {
   lock_guard l(lock);
+  _wait_for_readable(onreadable); 
+}
+
+void Journaler::_wait_for_readable(Context *onreadable)
+{
   if (is_stopping()) {
     finisher->queue(onreadable, -EAGAIN);
     return;
   }
 
-  assert(on_readable == 0);
+  ceph_assert(on_readable == 0);
   if (!readable) {
     ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
                   << onreadable << dendl;
@@ -1306,7 +1343,7 @@ void Journaler::_trim()
   if (is_stopping())
     return;
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   uint64_t period = get_layout_period();
   uint64_t trim_to = last_committed.expire_pos;
   trim_to -= trim_to % period;
@@ -1326,9 +1363,9 @@ void Journaler::_trim()
   }
 
   // trim
-  assert(trim_to <= write_pos);
-  assert(trim_to <= expire_pos);
-  assert(trim_to > trimming_pos);
+  ceph_assert(trim_to <= write_pos);
+  ceph_assert(trim_to <= expire_pos);
+  ceph_assert(trim_to > trimming_pos);
   ldout(cct, 10) << "trim trimming to " << trim_to
                 << ", trimmed/trimming/expire are "
                 << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
@@ -1348,7 +1385,7 @@ void Journaler::_finish_trim(int r, uint64_t to)
 {
   lock_guard l(lock);
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
           << ", trimmed/trimming/expire now "
           << to << "/" << trimming_pos << "/" << expire_pos
@@ -1359,10 +1396,10 @@ void Journaler::_finish_trim(int r, uint64_t to)
     return;
   }
 
-  assert(r >= 0 || r == -ENOENT);
+  ceph_assert(r >= 0 || r == -ENOENT);
 
-  assert(to <= trimming_pos);
-  assert(to > trimmed_pos);
+  ceph_assert(to <= trimming_pos);
+  ceph_assert(to > trimmed_pos);
   trimmed_pos = to;
 }
 
@@ -1382,7 +1419,7 @@ void Journaler::handle_write_error(int r)
     lderr(cct) << __func__ << ": multiple write errors, handler already called"
               << dendl;
   } else {
-    assert(0 == "unhandled write error");
+    ceph_abort_msg("unhandled write error");
   }
 }
 
@@ -1397,11 +1434,11 @@ void Journaler::handle_write_error(int r)
  */
 bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
 {
-  assert(need != NULL);
+  ceph_assert(need != NULL);
 
   uint32_t entry_size = 0;
   uint64_t entry_sentinel = 0;
-  bufferlist::iterator p = read_buf.begin();
+  auto p = read_buf.cbegin();
 
   // Do we have enough data to decode an entry prefix?
   if (format >= JOURNAL_FORMAT_RESILIENT) {
@@ -1411,13 +1448,13 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
   }
   if (read_buf.length() >= *need) {
     if (format >= JOURNAL_FORMAT_RESILIENT) {
-      ::decode(entry_sentinel, p);
+      decode(entry_sentinel, p);
       if (entry_sentinel != sentinel) {
        throw buffer::malformed_input("Invalid sentinel");
       }
     }
 
-    ::decode(entry_size, p);
+    decode(entry_size, p);
   } else {
     return false;
   }
@@ -1454,29 +1491,29 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
 size_t JournalStream::read(bufferlist &from, bufferlist *entry,
                           uint64_t *start_ptr)
 {
-  assert(start_ptr != NULL);
-  assert(entry != NULL);
-  assert(entry->length() == 0);
+  ceph_assert(start_ptr != NULL);
+  ceph_assert(entry != NULL);
+  ceph_assert(entry->length() == 0);
 
   uint32_t entry_size = 0;
 
   // Consume envelope prefix: entry_size and entry_sentinel
-  bufferlist::iterator from_ptr = from.begin();
+  auto from_ptr = from.cbegin();
   if (format >= JOURNAL_FORMAT_RESILIENT) {
     uint64_t entry_sentinel = 0;
-    ::decode(entry_sentinel, from_ptr);
+    decode(entry_sentinel, from_ptr);
     // Assertion instead of clean check because of precondition of this
     // fn is that readable() already passed
-    assert(entry_sentinel == sentinel);
+    ceph_assert(entry_sentinel == sentinel);
   }
-  ::decode(entry_size, from_ptr);
+  decode(entry_size, from_ptr);
 
   // Read out the payload
   from_ptr.copy(entry_size, *entry);
 
   // Consume the envelope suffix (start_ptr)
   if (format >= JOURNAL_FORMAT_RESILIENT) {
-    ::decode(*start_ptr, from_ptr);
+    decode(*start_ptr, from_ptr);
   } else {
     *start_ptr = 0;
   }
@@ -1494,16 +1531,16 @@ size_t JournalStream::read(bufferlist &from, bufferlist *entry,
 size_t JournalStream::write(bufferlist &entry, bufferlist *to,
                            uint64_t const &start_ptr)
 {
-  assert(to != NULL);
+  ceph_assert(to != NULL);
 
   uint32_t const entry_size = entry.length();
   if (format >= JOURNAL_FORMAT_RESILIENT) {
-    ::encode(sentinel, *to);
+    encode(sentinel, *to);
   }
-  ::encode(entry_size, *to);
+  encode(entry_size, *to);
   to->claim_append(entry);
   if (format >= JOURNAL_FORMAT_RESILIENT) {
-    ::encode(start_ptr, *to);
+    encode(start_ptr, *to);
   }
 
   if (format >= JOURNAL_FORMAT_RESILIENT) {
@@ -1529,7 +1566,7 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to,
  */
 void Journaler::set_write_error_handler(Context *c) {
   lock_guard l(lock);
-  assert(!on_write_error);
+  ceph_assert(!on_write_error);
   on_write_error = wrap_finisher(c);
   called_write_error = false;
 }
@@ -1578,3 +1615,17 @@ void Journaler::shutdown()
   waitfor_safe.clear();
 }
 
+void Journaler::check_isreadable()
+{
+  std::unique_lock l(lock);
+  while (!_is_readable() &&
+      get_read_pos() < get_write_pos() &&
+      !get_error()) {
+    C_SaferCond readable_waiter;
+    _wait_for_readable(&readable_waiter);
+    l.unlock();
+    readable_waiter.wait();
+    l.lock();
+  }
+  return ;
+}