]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Journaler.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / osdc / Journaler.cc
index 5949ff3b4016cd4282279996f15719cd7281bdae..8084a661d7d34be5cc04986dea37cc83a27d2fb6 100644 (file)
@@ -18,7 +18,7 @@
 #include "msg/Messenger.h"
 #include "osdc/Journaler.h"
 #include "common/errno.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "common/Finisher.h"
 
 #define dout_subsys ceph_subsys_journaler
 #define dout_prefix *_dout << objecter->messenger->get_myname() \
   << ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
 
+using namespace std;
 using std::chrono::seconds;
 
 
 class Journaler::C_DelayFlush : public Context {
   Journaler *journaler;
   public:
-  C_DelayFlush(Journaler *j) : journaler(j) {}
+  explicit C_DelayFlush(Journaler *j) : journaler(j) {}
   void finish(int r) override {
     journaler->_do_delayed_flush();
   }
@@ -58,7 +59,7 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf)
 {
   lock_guard lk(lock);
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   state = STATE_ACTIVE;
 
   stream_format = sf;
@@ -84,15 +85,17 @@ void Journaler::_set_layout(file_layout_t const *l)
 {
   layout = *l;
 
-  assert(layout.pool_id == pg_pool);
+  if (layout.pool_id != pg_pool) {
+    // user can reset pool id through cephfs-journal-tool
+    lderr(cct) << "may got older pool id from header layout" << dendl;
+    ceph_abort();
+  }
   last_written.layout = layout;
   last_committed.layout = layout;
 
   // prefetch intelligently.
   // (watch out, this is big if you use big objects or weird striping)
-  uint64_t periods = cct->_conf->journaler_prefetch_periods;
-  if (periods < 2)
-    periods = 2;  // we need at least 2 periods to make progress.
+  uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
   fetch_len = layout.get_period() * periods;
 }
 
@@ -155,14 +158,14 @@ public:
 void Journaler::recover(Context *onread) 
 {
   lock_guard l(lock);
-  if (stopping) {
+  if (is_stopping()) {
     onread->complete(-EAGAIN);
     return;
   }
 
   ldout(cct, 1) << "recover start" << dendl;
-  assert(state != STATE_ACTIVE);
-  assert(readonly);
+  ceph_assert(state != STATE_ACTIVE);
+  ceph_assert(readonly);
 
   if (onread)
     waitfor_recover.push_back(wrap_finisher(onread));
@@ -181,7 +184,7 @@ void Journaler::recover(Context *onread)
 void Journaler::_read_head(Context *on_finish, bufferlist *bl)
 {
   // lock is locked
-  assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
+  ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
 
   object_t oid = file_object_t(ino, 0);
   object_locator_t oloc(pg_pool);
@@ -205,7 +208,7 @@ void Journaler::reread_head(Context *onfinish)
 void Journaler::_reread_head(Context *onfinish)
 {
   ldout(cct, 10) << "reread_head" << dendl;
-  assert(state == STATE_ACTIVE);
+  ceph_assert(state == STATE_ACTIVE);
 
   state = STATE_REREADHEAD;
   C_RereadHead *fin = new C_RereadHead(this, onfinish);
@@ -215,16 +218,20 @@ void Journaler::_reread_head(Context *onfinish)
 void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    finish->complete(-EAGAIN);
+    return;
+  }
 
   //read on-disk header into
-  assert(bl.length() || r < 0 );
+  ceph_assert(bl.length() || r < 0 );
 
   // unpack header
   if (r == 0) {
     Header h;
-    bufferlist::iterator p = bl.begin();
+    auto p = bl.cbegin();
     try {
-      ::decode(h, p);
+      decode(h, p);
     } catch (const buffer::error &e) {
       finish->complete(-EINVAL);
       return;
@@ -243,8 +250,10 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 void Journaler::_finish_read_head(int r, bufferlist& bl)
 {
   lock_guard l(lock);
+  if (is_stopping())
+    return;
 
-  assert(state == STATE_READHEAD);
+  ceph_assert(state == STATE_READHEAD);
 
   if (r!=0) {
     ldout(cct, 0) << "error getting journal off disk" << dendl;
@@ -267,9 +276,9 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   // unpack header
   bool corrupt = false;
   Header h;
-  bufferlist::iterator p = bl.begin();
+  auto p = bl.cbegin();
   try {
-    ::decode(h, p);
+    decode(h, p);
 
     if (h.magic != magic) {
       ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
@@ -312,7 +321,7 @@ void Journaler::_probe(Context *finish, uint64_t *end)
 {
   // lock is locked
   ldout(cct, 1) << "probing for end of the log" << dendl;
-  assert(state == STATE_PROBING || state == STATE_REPROBING);
+  ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
   // probe the log
   filer.probe(ino, &layout, CEPH_NOSNAP,
              write_pos, end, true, 0, wrap_finisher(finish));
@@ -321,7 +330,7 @@ void Journaler::_probe(Context *finish, uint64_t *end)
 void Journaler::_reprobe(C_OnFinisher *finish)
 {
   ldout(cct, 10) << "reprobe" << dendl;
-  assert(state == STATE_ACTIVE);
+  ceph_assert(state == STATE_ACTIVE);
 
   state = STATE_REPROBING;
   C_ReProbe *fin = new C_ReProbe(this, finish);
@@ -333,8 +342,12 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
                                C_OnFinisher *onfinish)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    onfinish->complete(-EAGAIN);
+    return;
+  }
 
-  assert(new_end >= write_pos || r < 0);
+  ceph_assert(new_end >= write_pos || r < 0);
   ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
          << " (header had " << write_pos << ")."
          << dendl;
@@ -346,8 +359,10 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
 void Journaler::_finish_probe_end(int r, uint64_t end)
 {
   lock_guard l(lock);
+  if (is_stopping())
+    return;
 
-  assert(state == STATE_PROBING);
+  ceph_assert(state == STATE_PROBING);
   if (r < 0) { // error in probing
     goto out;
   }
@@ -357,7 +372,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
                  << write_pos << "). log was empty. recovered." << dendl;
     ceph_abort(); // hrm.
   } else {
-    assert(end >= write_pos);
+    ceph_assert(end >= write_pos);
     ldout(cct, 1) << "_finish_probe_end write_pos = " << end
                  << " (header had " << write_pos << "). recovered."
                  << dendl;
@@ -390,7 +405,7 @@ void Journaler::reread_head_and_probe(Context *onfinish)
 {
   lock_guard l(lock);
 
-  assert(state == STATE_ACTIVE);
+  ceph_assert(state == STATE_ACTIVE);
   _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
 }
 
@@ -398,8 +413,19 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
 {
   // Expect to be called back from finish_reread_head, which already takes lock
   // lock is locked
+  if (is_stopping()) {
+    onfinish->complete(-EAGAIN);
+    return;
+  }
 
-  assert(!r); //if we get an error, we're boned
+  // Let the caller know that the operation has failed or was intentionally
+  // failed since the caller has been blocklisted.
+  if (r == -EBLOCKLISTED) {
+    onfinish->complete(r);
+    return;
+  }
+
+  ceph_assert(!r); //if we get an error, we're boned
   _reprobe(onfinish);
 }
 
@@ -427,8 +453,8 @@ void Journaler::write_head(Context *oncommit)
 
 void Journaler::_write_head(Context *oncommit)
 {
-  assert(!readonly);
-  assert(state == STATE_ACTIVE);
+  ceph_assert(!readonly);
+  ceph_assert(state == STATE_ACTIVE);
   last_written.trimmed_pos = trimmed_pos;
   last_written.expire_pos = expire_pos;
   last_written.unused_field = expire_pos;
@@ -437,13 +463,13 @@ void Journaler::_write_head(Context *oncommit)
   ldout(cct, 10) << "write_head " << last_written << dendl;
 
   // Avoid persisting bad pointers in case of bugs
-  assert(last_written.write_pos >= last_written.expire_pos);
-  assert(last_written.expire_pos >= last_written.trimmed_pos);
+  ceph_assert(last_written.write_pos >= last_written.expire_pos);
+  ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
 
   last_wrote_head = ceph::real_clock::now();
 
   bufferlist bl;
-  ::encode(last_written, bl);
+  encode(last_written, bl);
   SnapContext snapc;
 
   object_t oid = file_object_t(ino, 0);
@@ -465,7 +491,7 @@ void Journaler::_finish_write_head(int r, Header &wrote,
     handle_write_error(r);
     return;
   }
-  assert(!readonly);
+  ceph_assert(!readonly);
   ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
   last_committed = wrote;
   if (oncommit) {
@@ -493,7 +519,7 @@ public:
 void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
 {
   lock_guard l(lock);
-  assert(!readonly);
+  ceph_assert(!readonly);
 
   if (r < 0) {
     lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
@@ -501,7 +527,7 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
     return;
   }
 
-  assert(start < flush_pos);
+  ceph_assert(start < flush_pos);
 
   // calc latency?
   if (logger) {
@@ -511,12 +537,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
 
   // adjust safe_pos
   auto it = pending_safe.find(start);
-  assert(it != pending_safe.end());
+  ceph_assert(it != pending_safe.end());
+  uint64_t min_next_safe_pos = pending_safe.begin()->second;
   pending_safe.erase(it);
   if (pending_safe.empty())
     safe_pos = next_safe_pos;
   else
-    safe_pos = pending_safe.begin()->second;
+    safe_pos = min_next_safe_pos;
 
   ldout(cct, 10) << "_finish_flush safe from " << start
                 << ", pending_safe " << pending_safe
@@ -526,11 +553,16 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
                 << dendl;
 
   // kick waiters <= safe_pos
-  while (!waitfor_safe.empty()) {
-    if (waitfor_safe.begin()->first > safe_pos)
-      break;
-    finish_contexts(cct, waitfor_safe.begin()->second);
-    waitfor_safe.erase(waitfor_safe.begin());
+  if (!waitfor_safe.empty()) {
+    list<Context*> ls;
+    while (!waitfor_safe.empty()) {
+      auto it = waitfor_safe.begin();
+      if (it->first > safe_pos)
+       break;
+      ls.splice(ls.end(), it->second);
+      waitfor_safe.erase(it);
+    }
+    finish_contexts(cct, ls);
   }
 }
 
@@ -540,7 +572,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 {
   unique_lock l(lock);
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   uint32_t s = bl.length();
 
   // append
@@ -560,7 +592,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 
   // flush previous object?
   uint64_t su = get_layout_period();
-  assert(su > 0);
+  ceph_assert(su > 0);
   uint64_t write_off = write_pos % su;
   uint64_t write_obj = write_pos / su;
   uint64_t flush_obj = flush_pos / su;
@@ -568,8 +600,12 @@ uint64_t Journaler::append_entry(bufferlist& bl)
     ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
                   << write_obj << " flo " << flush_obj << ")" << dendl;
     _do_flush(write_buf.length() - write_off);
-    if (write_off) {
-      // current entry isn't being flushed, set next_safe_pos to the end of previous entry
+
+    // if _do_flush() skips flushing some data, it does do a best effort to
+    // update next_safe_pos.
+    if (write_buf.length() > 0 &&
+       write_buf.length() <= wrote) { // the unflushed data are within this entry
+      // set next_safe_pos to end of previous entry
       next_safe_pos = write_pos - wrote;
     }
   }
@@ -580,14 +616,16 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 
 void Journaler::_do_flush(unsigned amount)
 {
+  if (is_stopping())
+    return;
   if (write_pos == flush_pos)
     return;
-  assert(write_pos > flush_pos);
-  assert(!readonly);
+  ceph_assert(write_pos > flush_pos);
+  ceph_assert(!readonly);
 
   // flush
   uint64_t len = write_pos - flush_pos;
-  assert(len == write_buf.length());
+  ceph_assert(len == write_buf.length());
   if (amount && amount < len)
     len = amount;
 
@@ -602,19 +640,16 @@ void Journaler::_do_flush(unsigned amount)
       ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
                     << " already too close to prezero_pos " << prezero_pos
                     << ", zeroing first" << dendl;
-      waiting_for_zero = true;
+      waiting_for_zero_pos = flush_pos + len;
       return;
     }
     if (static_cast<uint64_t>(newlen) < len) {
       ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
                     << " but hit prezero_pos " << prezero_pos
                     << ", will do " << flush_pos << "~" << newlen << dendl;
+      waiting_for_zero_pos = flush_pos + len;
       len = newlen;
-    } else {
-      waiting_for_zero = false;
     }
-  } else {
-    waiting_for_zero = false;
   }
   ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
 
@@ -634,6 +669,17 @@ void Journaler::_do_flush(unsigned amount)
     next_safe_pos = write_pos;
   } else {
     write_buf.splice(0, len, &write_bl);
+    // Keys of waitfor_safe map are journal entry boundaries.
+    // Try finding a journal entry that we are actually flushing
+    // and set next_safe_pos to end of it. This is best effort.
+    // The one we found may not be the lastest flushing entry.
+    auto p = waitfor_safe.lower_bound(flush_pos + len);
+    if (p != waitfor_safe.end()) {
+      if (p->first > flush_pos + len && p != waitfor_safe.begin())
+       --p;
+      if (p->first <= flush_pos + len && p->first > next_safe_pos)
+       next_safe_pos = p->first;
+    }
   }
 
   filer.write(ino, &layout, snapc,
@@ -642,7 +688,7 @@ void Journaler::_do_flush(unsigned amount)
              wrap_finisher(onsafe), write_iohint);
 
   flush_pos += len;
-  assert(write_buf.length() == write_pos - flush_pos);
+  ceph_assert(write_buf.length() == write_pos - flush_pos);
   write_buf_throttle.put(len);
   ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
  
@@ -658,8 +704,9 @@ void Journaler::_do_flush(unsigned amount)
 void Journaler::wait_for_flush(Context *onsafe)
 {
   lock_guard l(lock);
-  if (stopping) {
-    onsafe->complete(-EAGAIN);
+  if (is_stopping()) {
+    if (onsafe)
+      onsafe->complete(-EAGAIN);
     return;
   }
   _wait_for_flush(onsafe);
@@ -667,11 +714,11 @@ void Journaler::wait_for_flush(Context *onsafe)
 
 void Journaler::_wait_for_flush(Context *onsafe)
 {
-  assert(!readonly);
+  ceph_assert(!readonly);
 
   // all flushed and safe?
   if (write_pos == safe_pos) {
-    assert(write_buf.length() == 0);
+    ceph_assert(write_buf.length() == 0);
     ldout(cct, 10)
       << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
       "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
@@ -691,15 +738,20 @@ void Journaler::_wait_for_flush(Context *onsafe)
 void Journaler::flush(Context *onsafe)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    if (onsafe)
+      onsafe->complete(-EAGAIN);
+    return;
+  }
   _flush(wrap_finisher(onsafe));
 }
 
 void Journaler::_flush(C_OnFinisher *onsafe)
 {
-  assert(!readonly);
+  ceph_assert(!readonly);
 
   if (write_pos == flush_pos) {
-    assert(write_buf.length() == 0);
+    ceph_assert(write_buf.length() == 0);
     ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
       "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
                   << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
@@ -720,7 +772,7 @@ void Journaler::_flush(C_OnFinisher *onsafe)
 
 bool Journaler::_write_head_needed()
 {
-  return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval)
+  return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
       < ceph::real_clock::now();
 }
 
@@ -739,12 +791,9 @@ struct C_Journaler_Prezero : public Context {
 
 void Journaler::_issue_prezero()
 {
-  assert(prezeroing_pos >= flush_pos);
-
-  // we need to zero at least two periods, minimum, to ensure that we
-  // have a full empty object/period in front of us.
-  uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
+  ceph_assert(prezeroing_pos >= flush_pos);
 
+  uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
   /*
    * issue zero requests based on write_pos, even though the invariant
    * is that we zero ahead of flush_pos.
@@ -796,7 +845,7 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
     return;
   }
 
-  assert(r == 0 || r == -ENOENT);
+  ceph_assert(r == 0 || r == -ENOENT);
 
   if (start == prezero_pos) {
     prezero_pos += len;
@@ -807,8 +856,15 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
       pending_zero.erase(b);
     }
 
-    if (waiting_for_zero) {
-      _do_flush();
+    if (waiting_for_zero_pos > flush_pos) {
+      _do_flush(waiting_for_zero_pos - flush_pos);
+    }
+
+    if (prezero_pos == prezeroing_pos &&
+       !waitfor_prezero.empty()) {
+      list<Context*> ls;
+      ls.swap(waitfor_prezero);
+      finish_contexts(cct, ls, 0);
     }
   } else {
     pending_zero.insert(start, len);
@@ -819,6 +875,17 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
                 << dendl;
 }
 
+void Journaler::wait_for_prezero(Context *onfinish)
+{
+  ceph_assert(onfinish);
+  lock_guard l(lock);
+
+  if (prezero_pos == prezeroing_pos) {
+    finisher->queue(onfinish, 0);
+    return;
+  }
+  waitfor_prezero.push_back(wrap_finisher(onfinish));
+}
 
 
 /***************** READING *******************/
@@ -911,15 +978,15 @@ void Journaler::_assimilate_prefetch()
                   << p->second.length() << dendl;
     received_pos += p->second.length();
     read_buf.claim_append(p->second);
-    assert(received_pos <= requested_pos);
+    ceph_assert(received_pos <= requested_pos);
     prefetch_buf.erase(p);
     got_any = true;
   }
 
   if (got_any) {
     ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
-                  << read_buf.length() << ", read pointers " << read_pos
-                  << "/" << received_pos << "/" << requested_pos
+                  << read_buf.length() << ", read pointers read_pos=" << read_pos 
+                   << " received_pos=" << received_pos << " requested_pos=" << requested_pos
                   << dendl;
 
     // Update readability (this will also hit any decode errors resulting
@@ -944,15 +1011,24 @@ void Journaler::_issue_read(uint64_t len)
 {
   // stuck at safe_pos?  (this is needed if we are reading the tail of
   // a journal we are also writing to)
-  assert(requested_pos <= safe_pos);
+  ceph_assert(requested_pos <= safe_pos);
   if (requested_pos == safe_pos) {
     ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
                   << ", waiting" << dendl;
-    assert(write_pos > requested_pos);
+    ceph_assert(write_pos > requested_pos);
     if (pending_safe.empty()) {
       _flush(NULL);
     }
-    waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
+
+    // Make sure keys of waitfor_safe map are journal entry boundaries.
+    // The key we used here is either next_safe_pos or old value of
+    // next_safe_pos. next_safe_pos is always set to journal entry
+    // boundary.
+    auto p = pending_safe.rbegin();
+    if (p != pending_safe.rend())
+      waitfor_safe[p->second].push_back(new C_RetryRead(this));
+    else
+      waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
     return;
   }
 
@@ -965,8 +1041,8 @@ void Journaler::_issue_read(uint64_t len)
 
   // go.
   ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
-                << ", read pointers " << read_pos << "/" << received_pos
-                << "/" << (requested_pos+len) << dendl;
+                << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
+                << " requested_pos+len=" << (requested_pos+len) << dendl;
 
   // step by period (object).  _don't_ do a single big filer.read()
   // here because it will wait for all object reads to complete before
@@ -989,6 +1065,9 @@ void Journaler::_issue_read(uint64_t len)
 
 void Journaler::_prefetch()
 {
+  if (is_stopping())
+    return;
+
   ldout(cct, 10) << "_prefetch" << dendl;
   // prefetch
   uint64_t pf;
@@ -1061,7 +1140,8 @@ bool Journaler::_is_readable()
 
     // adjust write_pos
     prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
-    assert(write_buf.length() == 0);
+    ceph_assert(write_buf.length() == 0);
+    ceph_assert(waitfor_safe.empty());
 
     // reset read state
     requested_pos = received_pos = read_pos;
@@ -1133,6 +1213,10 @@ void Journaler::erase(Context *completion)
 void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    completion->complete(-EAGAIN);
+    return;
+  }
 
   if (data_result == 0) {
     // Async delete the journal header
@@ -1165,7 +1249,7 @@ bool Journaler::try_read_entry(bufferlist& bl)
   try {
     consumed = journal_stream.read(read_buf, &bl, &start_ptr);
     if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
-      assert(start_ptr == read_pos);
+      ceph_assert(start_ptr == read_pos);
     }
   } catch (const buffer::error &e) {
     lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
@@ -1189,18 +1273,25 @@ bool Journaler::try_read_entry(bufferlist& bl)
 
   // prefetch?
   _prefetch();
+
+  // If bufferlist consists of discontiguous memory, decoding types whose
+  // denc_traits needs contiguous memory is inefficient. The bufferlist may
+  // get copied to temporary memory multiple times (copy_shallow() in
+  // src/include/denc.h actually does deep copy)
+  if (bl.get_num_buffers() > 1)
+    bl.rebuild();
   return true;
 }
 
 void Journaler::wait_for_readable(Context *onreadable)
 {
   lock_guard l(lock);
-  if (stopping) {
-    onreadable->complete(-EAGAIN);
+  if (is_stopping()) {
+    finisher->queue(onreadable, -EAGAIN);
     return;
   }
 
-  assert(on_readable == 0);
+  ceph_assert(on_readable == 0);
   if (!readable) {
     ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
                   << onreadable << dendl;
@@ -1240,7 +1331,10 @@ void Journaler::trim()
 
 void Journaler::_trim()
 {
-  assert(!readonly);
+  if (is_stopping())
+    return;
+
+  ceph_assert(!readonly);
   uint64_t period = get_layout_period();
   uint64_t trim_to = last_committed.expire_pos;
   trim_to -= trim_to % period;
@@ -1260,9 +1354,9 @@ void Journaler::_trim()
   }
 
   // trim
-  assert(trim_to <= write_pos);
-  assert(trim_to <= expire_pos);
-  assert(trim_to > trimming_pos);
+  ceph_assert(trim_to <= write_pos);
+  ceph_assert(trim_to <= expire_pos);
+  ceph_assert(trim_to > trimming_pos);
   ldout(cct, 10) << "trim trimming to " << trim_to
                 << ", trimmed/trimming/expire are "
                 << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
@@ -1282,7 +1376,7 @@ void Journaler::_finish_trim(int r, uint64_t to)
 {
   lock_guard l(lock);
 
-  assert(!readonly);
+  ceph_assert(!readonly);
   ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
           << ", trimmed/trimming/expire now "
           << to << "/" << trimming_pos << "/" << expire_pos
@@ -1293,10 +1387,10 @@ void Journaler::_finish_trim(int r, uint64_t to)
     return;
   }
 
-  assert(r >= 0 || r == -ENOENT);
+  ceph_assert(r >= 0 || r == -ENOENT);
 
-  assert(to <= trimming_pos);
-  assert(to > trimmed_pos);
+  ceph_assert(to <= trimming_pos);
+  ceph_assert(to > trimmed_pos);
   trimmed_pos = to;
 }
 
@@ -1316,7 +1410,7 @@ void Journaler::handle_write_error(int r)
     lderr(cct) << __func__ << ": multiple write errors, handler already called"
               << dendl;
   } else {
-    assert(0 == "unhandled write error");
+    ceph_abort_msg("unhandled write error");
   }
 }
 
@@ -1331,11 +1425,11 @@ void Journaler::handle_write_error(int r)
  */
 bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
 {
-  assert(need != NULL);
+  ceph_assert(need != NULL);
 
   uint32_t entry_size = 0;
   uint64_t entry_sentinel = 0;
-  bufferlist::iterator p = read_buf.begin();
+  auto p = read_buf.cbegin();
 
   // Do we have enough data to decode an entry prefix?
   if (format >= JOURNAL_FORMAT_RESILIENT) {
@@ -1345,13 +1439,13 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
   }
   if (read_buf.length() >= *need) {
     if (format >= JOURNAL_FORMAT_RESILIENT) {
-      ::decode(entry_sentinel, p);
+      decode(entry_sentinel, p);
       if (entry_sentinel != sentinel) {
        throw buffer::malformed_input("Invalid sentinel");
       }
     }
 
-    ::decode(entry_size, p);
+    decode(entry_size, p);
   } else {
     return false;
   }
@@ -1388,29 +1482,29 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
 size_t JournalStream::read(bufferlist &from, bufferlist *entry,
                           uint64_t *start_ptr)
 {
-  assert(start_ptr != NULL);
-  assert(entry != NULL);
-  assert(entry->length() == 0);
+  ceph_assert(start_ptr != NULL);
+  ceph_assert(entry != NULL);
+  ceph_assert(entry->length() == 0);
 
   uint32_t entry_size = 0;
 
   // Consume envelope prefix: entry_size and entry_sentinel
-  bufferlist::iterator from_ptr = from.begin();
+  auto from_ptr = from.cbegin();
   if (format >= JOURNAL_FORMAT_RESILIENT) {
     uint64_t entry_sentinel = 0;
-    ::decode(entry_sentinel, from_ptr);
+    decode(entry_sentinel, from_ptr);
     // Assertion instead of clean check because of precondition of this
     // fn is that readable() already passed
-    assert(entry_sentinel == sentinel);
+    ceph_assert(entry_sentinel == sentinel);
   }
-  ::decode(entry_size, from_ptr);
+  decode(entry_size, from_ptr);
 
   // Read out the payload
   from_ptr.copy(entry_size, *entry);
 
   // Consume the envelope suffix (start_ptr)
   if (format >= JOURNAL_FORMAT_RESILIENT) {
-    ::decode(*start_ptr, from_ptr);
+    decode(*start_ptr, from_ptr);
   } else {
     *start_ptr = 0;
   }
@@ -1428,16 +1522,16 @@ size_t JournalStream::read(bufferlist &from, bufferlist *entry,
 size_t JournalStream::write(bufferlist &entry, bufferlist *to,
                            uint64_t const &start_ptr)
 {
-  assert(to != NULL);
+  ceph_assert(to != NULL);
 
   uint32_t const entry_size = entry.length();
   if (format >= JOURNAL_FORMAT_RESILIENT) {
-    ::encode(sentinel, *to);
+    encode(sentinel, *to);
   }
-  ::encode(entry_size, *to);
+  encode(entry_size, *to);
   to->claim_append(entry);
   if (format >= JOURNAL_FORMAT_RESILIENT) {
-    ::encode(start_ptr, *to);
+    encode(start_ptr, *to);
   }
 
   if (format >= JOURNAL_FORMAT_RESILIENT) {
@@ -1463,7 +1557,7 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to,
  */
 void Journaler::set_write_error_handler(Context *c) {
   lock_guard l(lock);
-  assert(!on_write_error);
+  ceph_assert(!on_write_error);
   on_write_error = wrap_finisher(c);
   called_write_error = false;
 }
@@ -1490,8 +1584,8 @@ void Journaler::shutdown()
 
   ldout(cct, 1) << __func__ << dendl;
 
+  state = STATE_STOPPING;
   readable = false;
-  stopping = true;
 
   // Kick out anyone reading from journal
   error = -EAGAIN;
@@ -1501,7 +1595,9 @@ void Journaler::shutdown()
     f->complete(-EAGAIN);
   }
 
-  finish_contexts(cct, waitfor_recover, -ESHUTDOWN);
+  list<Context*> ls;
+  ls.swap(waitfor_recover);
+  finish_contexts(cct, ls, -ESHUTDOWN);
 
   std::map<uint64_t, std::list<Context*> >::iterator i;
   for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {