]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/filestore/FileJournal.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / os / filestore / FileJournal.cc
index 7e6a19cbf0e1f66c4037d2ff33bca696e8121df7..610b2b32da56904799f1933e811770a688c440f7 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "journal "
 
+using std::list;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+
 const static int64_t ONE_MEG(1 << 20);
 const static int CEPH_DIRECTIO_ALIGNMENT(4096);
 
@@ -71,7 +86,7 @@ int FileJournal::_open(bool forwrite, bool create)
           << cpp_strerror(err) << dendl;
     }
   }
-  fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
+  fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644));
   if (fd < 0) {
     int err = errno;
     dout(2) << "FileJournal::_open unable to open journal "
@@ -146,7 +161,8 @@ int FileJournal::_open(bool forwrite, bool create)
 int FileJournal::_open_block_device()
 {
   int64_t bdev_sz = 0;
-  int ret = get_block_device_size(fd, &bdev_sz);
+  BlkDev blkdev(fd);
+  int ret = blkdev.get_size(&bdev_sz);
   if (ret) {
     dout(0) << __func__ << ": failed to read block device size." << dendl;
     return -EIO;
@@ -167,7 +183,7 @@ int FileJournal::_open_block_device()
   block_size = cct->_conf->journal_block_size;
 
   if (cct->_conf->journal_discard) {
-    discard = block_device_support_discard(fn.c_str());
+    discard = blkdev.support_discard();
     dout(10) << fn << " support discard: " << (int)discard << dendl;
   }
 
@@ -198,32 +214,13 @@ int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
           << newsize << " bytes: " << cpp_strerror(err) << dendl;
       return -err;
     }
-#ifdef HAVE_POSIX_FALLOCATE
-    ret = ::posix_fallocate(fd, 0, newsize);
+    ret = ceph_posix_fallocate(fd, 0, newsize);
     if (ret) {
       derr << "FileJournal::_open_file : unable to preallocation journal to "
           << newsize << " bytes: " << cpp_strerror(ret) << dendl;
       return -ret;
     }
     max_size = newsize;
-#elif defined(__APPLE__)
-    fstore_t store;
-    store.fst_flags = F_ALLOCATECONTIG;
-    store.fst_posmode = F_PEOFPOSMODE;
-    store.fst_offset = 0;
-    store.fst_length = newsize;
-
-    ret = ::fcntl(fd, F_PREALLOCATE, &store);
-    if (ret == -1) {
-      ret = -errno;
-      derr << "FileJournal::_open_file : unable to preallocation journal to "
-          << newsize << " bytes: " << cpp_strerror(ret) << dendl;
-      return ret;
-    }
-    max_size = newsize;
-#else
-# error "Journal pre-allocation not supported on platform."
-#endif
   }
   else {
     max_size = oldsize;
@@ -243,18 +240,18 @@ int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
     for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
       ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
       if (ret < 0) {
-       free(buf);
+       aligned_free(buf);
        return -errno;
       }
     }
     if (i < (uint64_t)max_size) {
       ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
       if (ret < 0) {
-       free(buf);
+       aligned_free(buf);
        return -errno;
       }
     }
-    free(buf);
+    aligned_free(buf);
   }
 
 
@@ -269,7 +266,7 @@ int FileJournal::check()
 {
   int ret;
 
-  assert(fd == -1);
+  ceph_assert(fd == -1);
   ret = _open(false, false);
   if (ret)
     return ret;
@@ -299,7 +296,7 @@ int FileJournal::create()
   void *buf = 0;
   int64_t needed_space;
   int ret;
-  buffer::ptr bp;
+  ceph::buffer::ptr bp;
   dout(2) << "create " << fn << " fsid " << fsid << dendl;
 
   ret = _open(true, true);
@@ -351,7 +348,7 @@ int FileJournal::create()
     goto free_buf;
   }
 
-  needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20;
+  needed_space = cct->_conf->osd_max_write_size << 20;
   needed_space += (2 * sizeof(entry_header_t)) + get_top();
   if (header.max_size - header.start < needed_space) {
     derr << "FileJournal::create: OSD journal is not large enough to hold "
@@ -380,7 +377,7 @@ done:
 // This can not be used on an active journal
 int FileJournal::peek_fsid(uuid_d& fsid)
 {
-  assert(fd == -1);
+  ceph_assert(fd == -1);
   int r = _open(false, false);
   if (r)
     return r;
@@ -398,6 +395,7 @@ int FileJournal::open(uint64_t fs_op_seq)
   dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
 
   uint64_t next_seq = fs_op_seq + 1;
+  uint64_t seq = -1;
 
   int err = _open(false);
   if (err)
@@ -410,7 +408,7 @@ int FileJournal::open(uint64_t fs_op_seq)
   // read header?
   err = read_header(&header);
   if (err < 0)
-    return err;
+    goto out;
 
   // static zeroed buffer for alignment padding
   delete [] zero_buf;
@@ -423,32 +421,38 @@ int FileJournal::open(uint64_t fs_op_seq)
   if (header.fsid != fsid) {
     derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
          << ", invalid (someone else's?) journal" << dendl;
-    return -EINVAL;
+    err = -EINVAL;
+    goto out;
   }
   if (header.max_size > max_size) {
     dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
-    return -EINVAL;
+    err = -EINVAL;
+    goto out;
   }
   if (header.block_size != block_size) {
     dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
-    return -EINVAL;
+    err = -EINVAL;
+    goto out;
   }
   if (header.max_size % header.block_size) {
     dout(2) << "open journal max size " << header.max_size
            << " not a multiple of block size " << header.block_size << dendl;
-    return -EINVAL;
+    err = -EINVAL;
+    goto out;
   }
   if (header.alignment != block_size && directio) {
     dout(0) << "open journal alignment " << header.alignment << " does not match block size "
            << block_size << " (required for direct_io journal mode)" << dendl;
-    return -EINVAL;
+    err = -EINVAL;
+    goto out;
   }
   if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
     dout(0) << "open journal alignment " << header.alignment
            << " is not multiple of minimum directio alignment "
            << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
            << dendl;
-    return -EINVAL;
+    err = -EINVAL;
+    goto out;
   }
 
   // looks like a valid header.
@@ -458,16 +462,7 @@ int FileJournal::open(uint64_t fs_op_seq)
 
   // find next entry
   read_pos = header.start;
-  uint64_t seq = header.start_seq;
-
-  // last_committed_seq is 1 before the start of the journal or
-  // 0 if the start is 0
-  last_committed_seq = seq > 0 ? seq - 1 : seq;
-  if (last_committed_seq < fs_op_seq) {
-    dout(2) << "open advancing committed_seq " << last_committed_seq
-           << " to fs op_seq " << fs_op_seq << dendl;
-    last_committed_seq = fs_op_seq;
-  }
+  seq = header.start_seq;
 
   while (1) {
     bufferlist bl;
@@ -493,6 +488,9 @@ int FileJournal::open(uint64_t fs_op_seq)
   }
 
   return 0;
+out:
+  close();
+  return err;
 }
 
 void FileJournal::_close(int fd) const
@@ -508,9 +506,9 @@ void FileJournal::close()
   stop_writer();
 
   // close
-  assert(writeq_empty());
-  assert(!must_write_header);
-  assert(fd >= 0);
+  ceph_assert(writeq_empty());
+  ceph_assert(!must_write_header);
+  ceph_assert(fd >= 0);
   _close(fd);
   fd = -1;
 }
@@ -538,7 +536,7 @@ int FileJournal::_fdump(Formatter &f, bool simple)
 {
   dout(10) << "_fdump" << dendl;
 
-  assert(fd == -1);
+  ceph_assert(fd == -1);
   int err = _open(false, false);
   if (err)
     return err;
@@ -604,7 +602,7 @@ int FileJournal::_fdump(Formatter &f, bool simple)
       f.dump_unsigned("bl.length", bl.length());
     } else {
       f.open_array_section("transactions");
-      bufferlist::iterator p = bl.begin();
+      auto p = bl.cbegin();
       int trans_num = 0;
       while (!p.end()) {
         ObjectStore::Transaction t(p);
@@ -645,13 +643,13 @@ void FileJournal::stop_writer()
   if (!write_stop)
   {
     {
-      Mutex::Locker l(write_lock);
-      Mutex::Locker p(writeq_lock);
+      std::lock_guard l{write_lock};
+      std::lock_guard p{writeq_lock};
       write_stop = true;
-      writeq_cond.Signal();
+      writeq_cond.notify_all();
       // Doesn't hurt to signal commit_cond in case thread is waiting there
       // and caller didn't use committed_thru() first.
-      commit_cond.Signal();
+      commit_cond.notify_all();
     }
     write_thread.join();
 
@@ -663,11 +661,11 @@ void FileJournal::stop_writer()
   // stop aio completeion thread *after* writer thread has stopped
   // and has submitted all of its io
   if (aio && !aio_stop) {
-    aio_lock.Lock();
+    aio_lock.lock();
     aio_stop = true;
-    aio_cond.Signal();
-    write_finish_cond.Signal();
-    aio_lock.Unlock();
+    aio_cond.notify_all();
+    write_finish_cond.notify_all();
+    aio_lock.unlock();
     write_finish_thread.join();
   }
 #endif
@@ -690,7 +688,7 @@ int FileJournal::read_header(header_t *hdr) const
   dout(10) << "read_header" << dendl;
   bufferlist bl;
 
-  buffer::ptr bp = buffer::create_page_aligned(block_size);
+  ceph::buffer::ptr bp = ceph::buffer::create_small_page_aligned(block_size);
   char* bpdata = bp.c_str();
   int r = ::pread(fd, bpdata, bp.length(), 0);
 
@@ -711,10 +709,10 @@ int FileJournal::read_header(header_t *hdr) const
   bl.push_back(std::move(bp));
 
   try {
-    bufferlist::iterator p = bl.begin();
-    ::decode(*hdr, p);
+    auto p = bl.cbegin();
+    decode(*hdr, p);
   }
-  catch (buffer::error& e) {
+  catch (ceph::buffer::error& e) {
     derr << "read_header error decoding journal header" << dendl;
     return -EINVAL;
   }
@@ -741,11 +739,11 @@ bufferptr FileJournal::prepare_header()
 {
   bufferlist bl;
   {
-    Mutex::Locker l(finisher_lock);
+    std::lock_guard l{finisher_lock};
     header.committed_up_to = journaled_seq;
   }
-  ::encode(header, bl);
-  bufferptr bp = buffer::create_page_aligned(get_top());
+  encode(header, bl);
+  bufferptr bp = ceph::buffer::create_small_page_aligned(get_top());
   // don't use bp.zero() here, because it also invalidates
   // crc cache (which is not yet populated anyway)
   char* data = bp.c_str();
@@ -757,7 +755,7 @@ bufferptr FileJournal::prepare_header()
 
 void FileJournal::write_header_sync()
 {
-  Mutex::Locker locker(write_lock);
+  std::lock_guard locker{write_lock};
   must_write_header = true;
   bufferlist bl;
   do_write(bl);
@@ -783,7 +781,11 @@ int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
     if (room >= (header.max_size >> 1) &&
         room - size < (header.max_size >> 1)) {
       dout(10) << " passing half full mark, triggering commit" << dendl;
-      do_sync_cond->SloppySignal();  // initiate a real commit so we can trim
+#ifdef CEPH_DEBUG_MUTEX
+      do_sync_cond->notify_all(true);  // initiate a real commit so we can trim
+#else
+      do_sync_cond->notify_all();
+#endif
     }
   }
 
@@ -829,10 +831,10 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
        items.erase(it++);
 #ifdef HAVE_LIBAIO
        {
-         Mutex::Locker locker(aio_lock);
-         assert(aio_write_queue_ops > 0);
+         std::lock_guard locker{aio_lock};
+         ceph_assert(aio_write_queue_ops > 0);
          aio_write_queue_ops--;
-         assert(aio_write_queue_bytes >= bytes);
+         ceph_assert(aio_write_queue_bytes >= bytes);
          aio_write_queue_bytes -= bytes;
        }
 #else
@@ -861,7 +863,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
           }
           print_header(header);
         }
-        
+
         return -ENOSPC;  // hrm, full on first op
       }
       if (eleft) {
@@ -885,7 +887,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
 
 out:
   dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
-  assert((write_pos + bl.length() == queue_pos) ||
+  ceph_assert((write_pos + bl.length() == queue_pos) ||
          (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
   return 0;
 }
@@ -911,7 +913,7 @@ void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
 
 void FileJournal::queue_completions_thru(uint64_t seq)
 {
-  assert(finisher_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(finisher_lock));
   utime_t now = ceph_clock_now();
   list<completion_item> items;
   batch_pop_completions(items);
@@ -939,7 +941,7 @@ void FileJournal::queue_completions_thru(uint64_t seq)
     items.erase(it++);
   }
   batch_unpop_completions(items);
-  finisher_cond.Signal();
+  finisher_cond.notify_all();
 }
 
 
@@ -999,9 +1001,9 @@ void FileJournal::check_align(off64_t pos, bufferlist& bl)
 {
   // make sure list segments are page aligned
   if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
-    assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
-    assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
-    assert(0 == "bl was not aligned");
+    ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
+    ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
+    ceph_abort_msg("bl was not aligned");
   }
 }
 
@@ -1032,7 +1034,7 @@ void FileJournal::do_write(bufferlist& bl)
   if (bl.length() == 0 && !must_write_header)
     return;
 
-  buffer::ptr hbp;
+  ceph::buffer::ptr hbp;
   if (cct->_conf->journal_write_header_frequency &&
       (((++journaled_since_start) %
        cct->_conf->journal_write_header_frequency) == 0)) {
@@ -1058,7 +1060,7 @@ void FileJournal::do_write(bufferlist& bl)
   if (write_pos >= header.max_size)
     write_pos = write_pos - header.max_size + get_top();
 
-  write_lock.Unlock();
+  write_lock.unlock();
 
   // split?
   off64_t split = 0;
@@ -1067,7 +1069,7 @@ void FileJournal::do_write(bufferlist& bl)
     split = header.max_size - pos;
     first.substr_of(bl, 0, split);
     second.substr_of(bl, split, bl.length() - split);
-    assert(first.length() + second.length() == bl.length());
+    ceph_assert(first.length() + second.length() == bl.length());
     dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
             << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
 
@@ -1078,7 +1080,10 @@ void FileJournal::do_write(bufferlist& bl)
     // header too?
     if (hbp.length()) {
       // be sneaky: include the header in the second fragment
-      second.push_front(hbp);
+      bufferlist tmp;
+      tmp.push_back(hbp);
+      tmp.claim_append(second);
+      second.swap(tmp);
       pos = 0;          // we included the header
     }
     // Write the second portion first possible with the header, so
@@ -1098,7 +1103,7 @@ void FileJournal::do_write(bufferlist& bl)
       check_align(first_pos, first);
       ceph_abort();
     }
-    assert(first_pos == get_top());
+    ceph_assert(first_pos == get_top());
   } else {
     // header too?
     if (hbp.length()) {
@@ -1138,7 +1143,7 @@ void FileJournal::do_write(bufferlist& bl)
      * flush disk caches or commits any sort of metadata.
      */
     int ret = 0;
-#if defined(DARWIN) || defined(__FreeBSD__)
+#if defined(__APPLE__) || defined(__FreeBSD__)
     ret = ::fsync(fd);
 #else
     ret = ::fdatasync(fd);
@@ -1156,13 +1161,13 @@ void FileJournal::do_write(bufferlist& bl)
   utime_t lat = ceph_clock_now() - from;
   dout(20) << "do_write latency " << lat << dendl;
 
-  write_lock.Lock();
+  write_lock.lock();
 
-  assert(write_pos == pos);
-  assert(write_pos % header.alignment == 0);
+  ceph_assert(write_pos == pos);
+  ceph_assert(write_pos % header.alignment == 0);
 
   {
-    Mutex::Locker locker(finisher_lock);
+    std::lock_guard locker{finisher_lock};
     journaled_seq = writing_seq;
 
     // kick finisher?
@@ -1186,9 +1191,8 @@ void FileJournal::flush()
 {
   dout(10) << "waiting for completions to empty" << dendl;
   {
-    Mutex::Locker l(finisher_lock);
-    while (!completions_empty())
-      finisher_cond.Wait(finisher_lock);
+    std::unique_lock l{finisher_lock};
+    finisher_cond.wait(l, [this] { return completions_empty(); });
   }
   dout(10) << "flush waiting for finisher" << dendl;
   finisher->wait_for_empty();
@@ -1201,12 +1205,12 @@ void FileJournal::write_thread_entry()
   dout(10) << "write_thread_entry start" << dendl;
   while (1) {
     {
-      Mutex::Locker locker(writeq_lock);
+      std::unique_lock locker{writeq_lock};
       if (writeq.empty() && !must_write_header) {
        if (write_stop)
          break;
        dout(20) << "write_thread_entry going to sleep" << dendl;
-       writeq_cond.Wait(writeq_lock);
+       writeq_cond.wait(locker);
        dout(20) << "write_thread_entry woke up" << dendl;
        continue;
       }
@@ -1214,7 +1218,7 @@ void FileJournal::write_thread_entry()
 
 #ifdef HAVE_LIBAIO
     if (aio) {
-      Mutex::Locker locker(aio_lock);
+      std::unique_lock locker{aio_lock};
       // should we back off to limit aios in flight?  try to do this
       // adaptively so that we submit larger aios once we have lots of
       // them in flight.
@@ -1227,7 +1231,7 @@ void FileJournal::write_thread_entry()
       // flight if we hit this limit to ensure we keep the device
       // saturated.
       while (aio_num > 0) {
-       int exp = MIN(aio_num * 2, 24);
+       int exp = std::min<int>(aio_num * 2, 24);
        long unsigned min_new = 1ull << exp;
        uint64_t cur = aio_write_queue_bytes;
        dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
@@ -1238,13 +1242,13 @@ void FileJournal::write_thread_entry()
        dout(20) << "write_thread_entry deferring until more aios complete: "
                 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
                 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
-       aio_cond.Wait(aio_lock);
+       aio_cond.wait(locker);
        dout(20) << "write_thread_entry woke up" << dendl;
       }
     }
 #endif
 
-    Mutex::Locker locker(write_lock);
+    std::unique_lock locker{write_lock};
     uint64_t orig_ops = 0;
     uint64_t orig_bytes = 0;
 
@@ -1263,12 +1267,12 @@ void FileJournal::write_thread_entry()
        r = 0;
       } else {
        dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
-       commit_cond.Wait(write_lock);
+       commit_cond.wait(locker);
        dout(20) << "write_thread_entry woke up" << dendl;
        continue;
       }
     }
-    assert(r == 0);
+    ceph_assert(r == 0);
 
     if (logger) {
       logger->inc(l_filestore_journal_wr);
@@ -1303,7 +1307,7 @@ void FileJournal::do_aio_write(bufferlist& bl)
   if (bl.length() == 0 && !must_write_header)
     return;
 
-  buffer::ptr hbp;
+  ceph::buffer::ptr hbp;
   if (must_write_header) {
     must_write_header = false;
     hbp = prepare_header();
@@ -1323,7 +1327,7 @@ void FileJournal::do_aio_write(bufferlist& bl)
     split = header.max_size - pos;
     first.substr_of(bl, 0, split);
     second.substr_of(bl, split, bl.length() - split);
-    assert(first.length() + second.length() == bl.length());
+    ceph_assert(first.length() + second.length() == bl.length());
     dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
 
     if (write_aio_bl(pos, first, 0)) {
@@ -1331,10 +1335,13 @@ void FileJournal::do_aio_write(bufferlist& bl)
           << ") failed" << dendl;
       ceph_abort();
     }
-    assert(pos == header.max_size);
+    ceph_assert(pos == header.max_size);
     if (hbp.length()) {
       // be sneaky: include the header in the second fragment
-      second.push_front(hbp);
+      bufferlist tmp;
+      tmp.push_back(hbp);
+      tmp.claim_append(second);
+      second.swap(tmp);
       pos = 0;          // we included the header
     } else
       pos = get_top();  // no header, start after that
@@ -1365,7 +1372,7 @@ void FileJournal::do_aio_write(bufferlist& bl)
   write_pos = pos;
   if (write_pos == header.max_size)
     write_pos = get_top();
-  assert(write_pos % header.alignment == 0);
+  ceph_assert(write_pos % header.alignment == 0);
 }
 
 /**
@@ -1379,15 +1386,13 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
   dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
 
   while (bl.length() > 0) {
-    int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
+    int max = std::min<int>(bl.get_num_buffers(), IOV_MAX-1);
     iovec *iov = new iovec[max];
     int n = 0;
     unsigned len = 0;
-    for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
-        n < max;
-        ++p, ++n) {
-      assert(p != bl.buffers().end());
-      iov[n].iov_base = (void *)p->c_str();
+    for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) {
+      ceph_assert(p != std::cend(bl.buffers()));
+      iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str()));
       iov[n].iov_len = p->length();
       len += p->length();
     }
@@ -1397,7 +1402,7 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
 
     // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
     // modified in check_aio_completion
-    aio_lock.Lock();
+    aio_lock.lock();
     aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
     aio_info& aio = aio_queue.back();
     aio.iov = iov;
@@ -1414,10 +1419,10 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
     // aio could be ereased from aio_queue once it is done
     uint64_t cur_len = aio.len;
     // unlock aio_lock because following io_submit might take time to return
-    aio_lock.Unlock();
+    aio_lock.unlock();
 
     iocb *piocb = &aio.iocb;
-    
+
     // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
     int attempts = 16;
     int delay = 125;
@@ -1433,16 +1438,16 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
          continue;
        }
        check_align(pos, tbl);
-       assert(0 == "io_submit got unexpected error");
+       ceph_abort_msg("io_submit got unexpected error");
       } else {
        break;
       }
     } while (true);
     pos += cur_len;
   }
-  aio_lock.Lock();
-  write_finish_cond.Signal();
-  aio_lock.Unlock();
+  aio_lock.lock();
+  write_finish_cond.notify_all();
+  aio_lock.unlock();
   return 0;
 }
 #endif
@@ -1450,20 +1455,20 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
 void FileJournal::write_finish_thread_entry()
 {
 #ifdef HAVE_LIBAIO
-  dout(10) << "write_finish_thread_entry enter" << dendl;
+  dout(10) << __func__ << " enter" << dendl;
   while (true) {
     {
-      Mutex::Locker locker(aio_lock);
+      std::unique_lock locker{aio_lock};
       if (aio_queue.empty()) {
        if (aio_stop)
          break;
-       dout(20) << "write_finish_thread_entry sleeping" << dendl;
-       write_finish_cond.Wait(aio_lock);
+       dout(20) << __func__ << " sleeping" << dendl;
+       write_finish_cond.wait(locker);
        continue;
       }
     }
 
-    dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl;
+    dout(20) << __func__ << " waiting for aio(s)" << dendl;
     io_event event[16];
     int r = io_getevents(aio_ctx, 1, 16, event, NULL);
     if (r < 0) {
@@ -1472,26 +1477,29 @@ void FileJournal::write_finish_thread_entry()
        continue;
       }
       derr << "io_getevents got " << cpp_strerror(r) << dendl;
-      assert(0 == "got unexpected error from io_getevents");
+      if (r == -EIO) {
+       note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0);
+      }
+      ceph_abort_msg("got unexpected error from io_getevents");
     }
 
     {
-      Mutex::Locker locker(aio_lock);
+      std::lock_guard locker{aio_lock};
       for (int i=0; i<r; i++) {
        aio_info *ai = (aio_info *)event[i].obj;
        if (event[i].res != ai->len) {
          derr << "aio to " << ai->off << "~" << ai->len
               << " returned: " << (int)event[i].res << dendl;
-         assert(0 == "unexpected aio error");
+         ceph_abort_msg("unexpected aio error");
        }
-       dout(10) << "write_finish_thread_entry aio " << ai->off
+       dout(10) << __func__ << " aio " << ai->off
                 << "~" << ai->len << " done" << dendl;
        ai->done = true;
       }
       check_aio_completion();
     }
   }
-  dout(10) << "write_finish_thread_entry exit" << dendl;
+  dout(10) << __func__ << " exit" << dendl;
 #endif
 }
 
@@ -1501,7 +1509,7 @@ void FileJournal::write_finish_thread_entry()
  */
 void FileJournal::check_aio_completion()
 {
-  assert(aio_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(aio_lock));
   dout(20) << "check_aio_completion" << dendl;
 
   bool completed_something = false, signal = false;
@@ -1524,7 +1532,7 @@ void FileJournal::check_aio_completion()
   if (completed_something) {
     // kick finisher?
     //  only if we haven't filled up recently!
-    Mutex::Locker locker(finisher_lock);
+    std::lock_guard locker{finisher_lock};
     journaled_seq = new_journaled_seq;
     if (full_state != FULL_NOTFULL) {
       dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
@@ -1541,7 +1549,7 @@ void FileJournal::check_aio_completion()
   }
   if (signal) {
     // maybe write queue was waiting for aio count to drop?
-    aio_cond.Signal();
+    aio_cond.notify_all();
   }
 }
 #endif
@@ -1557,7 +1565,7 @@ int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist
      data_len = (*p).get_data_length();
      data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
     }
-    ::encode(*p, bl);
+    encode(*p, bl);
   }
   if (tbl->length()) {
     bl.claim_append(*tbl);
@@ -1569,7 +1577,7 @@ int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist
   memset(&h, 0, sizeof(h));
   if (data_align >= 0)
     h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
-  off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
+  off64_t size = round_up_to(base_size + h.pre_pad, header.alignment);
   unsigned post_pad = size - base_size - h.pre_pad;
   h.len = bl.length();
   h.post_pad = post_pad;
@@ -1583,18 +1591,18 @@ int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist
   // header
   ebl.append((const char*)&h, sizeof(h));
   if (h.pre_pad) {
-    ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
+    ebl.push_back(ceph::buffer::create_static(h.pre_pad, zero_buf));
   }
   // payload
-  ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
+  ebl.claim_append(bl);
   if (h.post_pad) {
-    ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
+    ebl.push_back(ceph::buffer::create_static(h.post_pad, zero_buf));
   }
   // footer
   ebl.append((const char*)&h, sizeof(h));
   if (directio)
     ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
-  tbl->claim(ebl);
+  *tbl = std::move(ebl);
   return h.len;
 }
 
@@ -1605,11 +1613,9 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
   dout(5) << "submit_entry seq " << seq
          << " len " << e.length()
          << " (" << oncommit << ")" << dendl;
-  assert(e.length() > 0);
-  assert(e.length() < header.max_size);
+  ceph_assert(e.length() > 0);
+  ceph_assert(e.length() < header.max_size);
 
-  if (osd_op)
-    osd_op->mark_event("commit_queued_for_journal_write");
   if (logger) {
     logger->inc(l_filestore_journal_queue_bytes, orig_len);
     logger->inc(l_filestore_journal_queue_ops, 1);
@@ -1630,23 +1636,23 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
     }
   }
   {
-    Mutex::Locker l1(writeq_lock);
+    std::lock_guard l1{writeq_lock};
 #ifdef HAVE_LIBAIO
-    Mutex::Locker l2(aio_lock);
+    std::lock_guard l2{aio_lock};
 #endif
-    Mutex::Locker l3(completions_lock);
+    std::lock_guard l3{completions_lock};
 
 #ifdef HAVE_LIBAIO
     aio_write_queue_ops++;
     aio_write_queue_bytes += e.length();
-    aio_cond.Signal();
+    aio_cond.notify_all();
 #endif
 
     completions.push_back(
       completion_item(
        seq, oncommit, ceph_clock_now(), osd_op));
     if (writeq.empty())
-      writeq_cond.Signal();
+      writeq_cond.notify_all();
     writeq.push_back(write_item(seq, e, orig_len, osd_op));
     if (osd_op)
       osd_op->journal_trace.keyval("queue depth", writeq.size());
@@ -1655,21 +1661,21 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
 
 bool FileJournal::writeq_empty()
 {
-  Mutex::Locker locker(writeq_lock);
+  std::lock_guard locker{writeq_lock};
   return writeq.empty();
 }
 
 FileJournal::write_item &FileJournal::peek_write()
 {
-  assert(write_lock.is_locked());
-  Mutex::Locker locker(writeq_lock);
+  ceph_assert(ceph_mutex_is_locked(write_lock));
+  std::lock_guard locker{writeq_lock};
   return writeq.front();
 }
 
 void FileJournal::pop_write()
 {
-  assert(write_lock.is_locked());
-  Mutex::Locker locker(writeq_lock);
+  ceph_assert(ceph_mutex_is_locked(write_lock));
+  std::lock_guard locker{writeq_lock};
   if (logger) {
     logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
     logger->dec(l_filestore_journal_queue_ops, 1);
@@ -1679,9 +1685,9 @@ void FileJournal::pop_write()
 
 void FileJournal::batch_pop_write(list<write_item> &items)
 {
-  assert(write_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(write_lock));
   {
-    Mutex::Locker locker(writeq_lock);
+    std::lock_guard locker{writeq_lock};
     writeq.swap(items);
   }
   for (auto &&i : items) {
@@ -1694,14 +1700,14 @@ void FileJournal::batch_pop_write(list<write_item> &items)
 
 void FileJournal::batch_unpop_write(list<write_item> &items)
 {
-  assert(write_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(write_lock));
   for (auto &&i : items) {
     if (logger) {
       logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
       logger->inc(l_filestore_journal_queue_ops, 1);
     }
   }
-  Mutex::Locker locker(writeq_lock);
+  std::lock_guard locker{writeq_lock};
   writeq.splice(writeq.begin(), items);
 }
 
@@ -1742,21 +1748,24 @@ void FileJournal::commit_start(uint64_t seq)
  */
 void FileJournal::do_discard(int64_t offset, int64_t end)
 {
-  dout(10) << __func__ << "trim(" << offset << ", " << end << dendl;
+  dout(10) << __func__ << " trim(" << offset << ", " << end << dendl;
 
-  offset = ROUND_UP_TO(offset, block_size);
+  offset = round_up_to(offset, block_size);
   if (offset >= end)
     return;
-  end = ROUND_UP_TO(end - block_size, block_size);
-  assert(end >= offset);
-  if (offset < end)
-    if (block_device_discard(fd, offset, end - offset) < 0)
+  end = round_up_to(end - block_size, block_size);
+  ceph_assert(end >= offset);
+  if (offset < end) {
+    BlkDev blkdev(fd);
+    if (blkdev.discard(offset, end - offset) < 0) {
        dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
+    }
+  }
 }
 
 void FileJournal::committed_thru(uint64_t seq)
 {
-  Mutex::Locker locker(write_lock);
+  std::lock_guard locker{write_lock};
 
   auto released = throttle.flush(seq);
   if (logger) {
@@ -1766,7 +1775,7 @@ void FileJournal::committed_thru(uint64_t seq)
 
   if (seq < last_committed_seq) {
     dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
-    assert(seq >= last_committed_seq);
+    ceph_assert(seq >= last_committed_seq);
     return;
   }
   if (seq == last_committed_seq) {
@@ -1779,7 +1788,7 @@ void FileJournal::committed_thru(uint64_t seq)
 
   // completions!
   {
-    Mutex::Locker locker(finisher_lock);
+    std::lock_guard locker{finisher_lock};
     queue_completions_thru(seq);
     if (plug_journal_completions && seq >= header.start_seq) {
       dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
@@ -1824,7 +1833,7 @@ void FileJournal::committed_thru(uint64_t seq)
     pop_write();
   }
 
-  commit_cond.Signal();
+  commit_cond.notify_all();
 
   dout(10) << "committed_thru done" << dendl;
 }
@@ -1909,13 +1918,13 @@ void FileJournal::wrap_read_bl(
       len = olen;                         // rest
 
     int64_t actual = ::lseek64(fd, pos, SEEK_SET);
-    assert(actual == pos);
+    ceph_assert(actual == pos);
 
-    bufferptr bp = buffer::create(len);
+    bufferptr bp = ceph::buffer::create(len);
     int r = safe_read_exact(fd, bp.c_str(), len);
     if (r) {
       derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
-          << r << dendl;
+          << cpp_strerror(r) << dendl;
       ceph_abort();
     }
     bl->push_back(std::move(bp));
@@ -1972,6 +1981,8 @@ bool FileJournal::read_entry(
         journaled_seq = seq;
       return true;
     }
+  } else {
+    derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
   }
 
   if (seq && seq < header.committed_up_to) {
@@ -1987,7 +1998,6 @@ bool FileJournal::read_entry(
     }
   }
 
-  dout(25) << ss.str() << dendl;
   dout(2) << "No further valid entries found, journal is most likely valid"
          << dendl;
   return false;
@@ -2076,7 +2086,7 @@ FileJournal::read_entry_result FileJournal::do_read_entry(
   if (_h)
     *_h = *h;
 
-  assert(cur_pos % header.alignment == 0);
+  ceph_assert(cur_pos % header.alignment == 0);
   return SUCCESS;
 }
 
@@ -2125,18 +2135,18 @@ void FileJournal::corrupt(
     corrupt_at = corrupt_at + get_top() - header.max_size;
 
   int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
-  assert(actual == corrupt_at);
+  ceph_assert(actual == corrupt_at);
 
   char buf[10];
   int r = safe_read_exact(fd, buf, 1);
-  assert(r == 0);
+  ceph_assert(r == 0);
 
   actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
-  assert(actual == corrupt_at);
+  ceph_assert(actual == corrupt_at);
 
   buf[0]++;
   r = safe_write(wfd, buf, 1);
-  assert(r == 0);
+  ceph_assert(r == 0);
 }
 
 void FileJournal::corrupt_payload(
@@ -2194,3 +2204,31 @@ off64_t FileJournal::get_journal_size_estimate()
   dout(20) << __func__ << " journal size=" << size << dendl;
   return size;
 }
+
+void FileJournal::get_devices(set<string> *ls)
+{
+  string dev_node;
+  BlkDev blkdev(fd);
+  if (int rc = blkdev.wholedisk(&dev_node); rc) {
+    return;
+  }
+  get_raw_devices(dev_node, ls);
+}
+
+void FileJournal::collect_metadata(map<string,string> *pm)
+{
+  BlkDev blkdev(fd);
+  char partition_path[PATH_MAX];
+  char dev_node[PATH_MAX];
+  if (blkdev.partition(partition_path, PATH_MAX)) {
+    (*pm)["backend_filestore_journal_partition_path"] = "unknown";
+  } else {
+    (*pm)["backend_filestore_journal_partition_path"] = string(partition_path);
+  }
+  if (blkdev.wholedisk(dev_node, PATH_MAX)) {
+    (*pm)["backend_filestore_journal_dev_node"] = "unknown";
+  } else {
+    (*pm)["backend_filestore_journal_dev_node"] = string(dev_node);
+    devname = dev_node;
+  }
+}