]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/ObjectCacher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
index b64d9abf2e113bf2c5a04cc52aebf94962784e24..b5c336b3a9a94142a7b6173a298aef3c1d29ebc4 100644 (file)
@@ -94,7 +94,7 @@ public:
 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
                                                      loff_t off)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
 
   // split off right
@@ -156,7 +156,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
 
 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
 
   ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
   if (left->get_journal_tid() == 0) {
@@ -206,7 +206,7 @@ bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
 
 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
 
   // do not merge rx buffers; last_read_tid may not match
@@ -251,7 +251,7 @@ void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead *bh)
  */
 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
   while (left > 0) {
     if (p == data.end())
@@ -279,7 +279,7 @@ bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
  */
 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   if (data.empty())
       return true;
   map<loff_t, BufferHead*>::iterator first = data.begin();
@@ -300,7 +300,7 @@ int ObjectCacher::Object::map_read(ObjectExtent &ex,
                                    map<loff_t, BufferHead*>& rx,
                                   map<loff_t, BufferHead*>& errors)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   ldout(oc->cct, 10) << "map_read " << ex.oid << " "
                      << ex.offset << "~" << ex.length << dendl;
 
@@ -424,7 +424,7 @@ void ObjectCacher::Object::audit_buffers()
 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
                                                          ceph_tid_t tid)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   BufferHead *final = 0;
 
   ldout(oc->cct, 10) << "map_write oex " << ex.oid
@@ -556,7 +556,7 @@ void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
 
 void ObjectCacher::Object::truncate(loff_t s)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
 
   while (!data.empty()) {
@@ -583,7 +583,7 @@ void ObjectCacher::Object::truncate(loff_t s)
 void ObjectCacher::Object::discard(loff_t off, loff_t len,
                                    C_GatherBuilder* commit_gather)
 {
-  ceph_assert(oc->lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(oc->lock));
   ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
                     << dendl;
 
@@ -650,7 +650,7 @@ void ObjectCacher::Object::discard(loff_t off, loff_t len,
 
 
 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
-                          WritebackHandler& wb, Mutex& l,
+                          WritebackHandler& wb, ceph::mutex& l,
                           flush_set_callback_t flush_callback,
                           void *flush_callback_arg, uint64_t max_bytes,
                           uint64_t max_objects, uint64_t max_dirty,
@@ -741,7 +741,7 @@ ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
                                               uint64_t truncate_seq)
 {
   // XXX: Add handling of nspace in object_locator_t in cache
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   // have it?
   if ((uint32_t)l.pool < objects.size()) {
     if (objects[l.pool].count(oid)) {
@@ -765,7 +765,7 @@ ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
 
 void ObjectCacher::close_object(Object *ob)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 10) << "close_object " << *ob << dendl;
   ceph_assert(ob->can_close());
 
@@ -779,7 +779,7 @@ void ObjectCacher::close_object(Object *ob)
 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
                            const ZTracer::Trace &parent_trace)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
                << reads_outstanding << dendl;
 
@@ -811,7 +811,7 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
                                  uint64_t length, bufferlist &bl, int r,
                                  bool trust_enoent)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 7) << "bh_read_finish "
                << oid
                << " tid " << tid
@@ -984,7 +984,7 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
   retry_waiting_reads();
 
   --reads_outstanding;
-  read_cond.Signal();
+  read_cond.notify_all();
 }
 
 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
@@ -1059,7 +1059,7 @@ public:
 };
 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
 
   Object *ob = blist.front()->ob;
   ob->get();
@@ -1112,7 +1112,7 @@ void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
 
 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 7) << "bh_write " << *bh << dendl;
 
   bh->ob->get();
@@ -1154,7 +1154,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
                                   vector<pair<loff_t, uint64_t> >& ranges,
                                   ceph_tid_t tid, int r)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
                << " ranges " << ranges << " returned " << r << dendl;
 
@@ -1261,7 +1261,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
 {
   ceph_assert(trace != nullptr);
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph::real_time cutoff = ceph::real_clock::now();
 
   ldout(cct, 10) << "flush " << amount << dendl;
@@ -1291,7 +1291,7 @@ void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
 
 void ObjectCacher::trim()
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 10) << "trim  start: bytes: max " << max_size << "  clean "
                 << get_stat_clean() << ", objects: max " << max_objects
                 << " current " << ob_lru.lru_get_size() << dendl;
@@ -1341,7 +1341,7 @@ void ObjectCacher::trim()
 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
                             snapid_t snapid)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   for (vector<ObjectExtent>::iterator ex_it = extents.begin();
        ex_it != extents.end();
        ++ex_it) {
@@ -1384,7 +1384,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
                         bool external_call, ZTracer::Trace *trace)
 {
   ceph_assert(trace != nullptr);
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   bool success = true;
   int error = 0;
   uint64_t bytes_in_cache = 0;
@@ -1706,7 +1706,7 @@ void ObjectCacher::retry_waiting_reads()
 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
                         ZTracer::Trace *parent_trace)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph::real_time now = ceph::real_clock::now();
   uint64_t bytes_written = 0;
   uint64_t bytes_written_in_flush = 0;
@@ -1812,14 +1812,14 @@ private:
 void ObjectCacher::C_WaitForWrite::finish(int r)
 {
   std::lock_guard l(m_oc->lock);
-  m_oc->maybe_wait_for_writeback(m_len, &m_trace);
+  m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
   m_onfinish->complete(r);
 }
 
-void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
-                                            ZTracer::Trace *trace)
+void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
+                                            ZTracer::Trace *trace)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph::mono_time start = ceph::mono_clock::now();
   int blocked = 0;
   // wait for writeback?
@@ -1842,10 +1842,12 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
                   << (get_stat_dirty() + get_stat_tx()) << " >= max "
                   << max_dirty << " + dirty_waiting "
                   << get_stat_dirty_waiting() << dendl;
-    flusher_cond.Signal();
+    flusher_cond.notify_all();
     stat_dirty_waiting += len;
     ++stat_nr_dirty_waiters;
-    stat_cond.Wait(lock);
+    std::unique_lock l{lock, std::adopt_lock};
+    stat_cond.wait(l);
+    l.release();
     stat_dirty_waiting -= len;
     --stat_nr_dirty_waiters;
     ++blocked;
@@ -1866,13 +1868,13 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
                                  ZTracer::Trace *trace, Context *onfreespace)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph_assert(trace != nullptr);
   int ret = 0;
 
   if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
     if (block_writes_upfront) {
-      maybe_wait_for_writeback(len, trace);
+      _maybe_wait_for_writeback(len, trace);
       if (onfreespace)
        onfreespace->complete(0);
     } else {
@@ -1881,18 +1883,19 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
     }
   } else {
     // write-thru!  flush what we just wrote.
-    Cond cond;
+    ceph::condition_variable cond;
     bool done = false;
     Context *fin = block_writes_upfront ?
-      new C_Cond(&cond, &done, &ret) : onfreespace;
+      new C_Cond(cond, &done, &ret) : onfreespace;
     ceph_assert(fin);
     bool flushed = flush_set(oset, wr->extents, trace, fin);
     ceph_assert(!flushed);   // we just dirtied it, and didn't drop our lock!
     ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
                   << " bytes" << dendl;
     if (block_writes_upfront) {
-      while (!done)
-       cond.Wait(lock);
+      std::unique_lock l{lock, std::adopt_lock};
+      cond.wait(l, [&done] { return done; });
+      l.release();
       ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
       if (onfreespace)
        onfreespace->complete(ret);
@@ -1903,7 +1906,7 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
   if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
     ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
                   << target_dirty << ", nudging flusher" << dendl;
-    flusher_cond.Signal();
+    flusher_cond.notify_all();
   }
   return ret;
 }
@@ -1911,7 +1914,7 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
 void ObjectCacher::flusher_entry()
 {
   ldout(cct, 10) << "flusher start" << dendl;
-  lock.Lock();
+  std::unique_lock l{lock};
   while (!flusher_stop) {
     loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
       get_stat_dirty();
@@ -1959,8 +1962,8 @@ void ObjectCacher::flusher_entry()
       if (!max) {
        // back off the lock to avoid starving other threads
         trace.event("backoff");
-       lock.Unlock();
-       lock.Lock();
+       l.unlock();
+       l.lock();
        continue;
       }
     }
@@ -1969,7 +1972,7 @@ void ObjectCacher::flusher_entry()
     if (flusher_stop)
       break;
 
-    flusher_cond.WaitInterval(lock, seconds(1));
+    flusher_cond.wait_for(l, 1s);
   }
 
   /* Wait for reads to finish. This is only possible if handling
@@ -1978,13 +1981,15 @@ void ObjectCacher::flusher_entry()
    * the rados reads do come back their callback will try to access the
    * no-longer-valid ObjectCacher.
    */
-  while (reads_outstanding > 0) {
-    ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
-                  << reads_outstanding << dendl;
-    read_cond.Wait(lock);
-  }
-
-  lock.Unlock();
+  read_cond.wait(l, [this] {
+    if (reads_outstanding > 0) {
+      ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
+                    << reads_outstanding << dendl;
+      return false;
+    } else {
+      return true;
+    }
+  });
   ldout(cct, 10) << "flusher finish" << dendl;
 }
 
@@ -1993,7 +1998,7 @@ void ObjectCacher::flusher_entry()
 
 bool ObjectCacher::set_is_empty(ObjectSet *oset)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (oset->objects.empty())
     return true;
 
@@ -2006,7 +2011,7 @@ bool ObjectCacher::set_is_empty(ObjectSet *oset)
 
 bool ObjectCacher::set_is_cached(ObjectSet *oset)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (oset->objects.empty())
     return false;
 
@@ -2027,7 +2032,7 @@ bool ObjectCacher::set_is_cached(ObjectSet *oset)
 
 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (oset->objects.empty())
     return false;
 
@@ -2051,7 +2056,7 @@ bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
 // purge.  non-blocking.  violently removes dirty buffers from cache.
 void ObjectCacher::purge(Object *ob)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 10) << "purge " << *ob << dendl;
 
   ob->truncate(0);
@@ -2066,7 +2071,7 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
                          ZTracer::Trace *trace)
 {
   ceph_assert(trace != nullptr);
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   list<BufferHead*> blist;
   bool clean = true;
   ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
@@ -2101,7 +2106,7 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
                                     Context *onfinish)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (gather->has_subs()) {
     gather->set_finisher(onfinish);
     gather->activate();
@@ -2117,7 +2122,7 @@ bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
 // returns true if already flushed
 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph_assert(onfinish != NULL);
   if (oset->objects.empty()) {
     ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
@@ -2220,7 +2225,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
                             ZTracer::Trace *trace, Context *onfinish)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph_assert(trace != nullptr);
   ceph_assert(onfinish != NULL);
   if (oset->objects.empty()) {
@@ -2262,7 +2267,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
 // returns true if already flushed
 bool ObjectCacher::flush_all(Context *onfinish)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph_assert(onfinish != NULL);
 
   ldout(cct, 10) << "flush_all " << dendl;
@@ -2317,7 +2322,7 @@ bool ObjectCacher::flush_all(Context *onfinish)
 
 void ObjectCacher::purge_set(ObjectSet *oset)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   if (oset->objects.empty()) {
     ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
     return;
@@ -2343,7 +2348,7 @@ void ObjectCacher::purge_set(ObjectSet *oset)
 
 loff_t ObjectCacher::release(Object *ob)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   list<BufferHead*> clean;
   loff_t o_unclean = 0;
 
@@ -2385,7 +2390,7 @@ loff_t ObjectCacher::release(Object *ob)
 
 loff_t ObjectCacher::release_set(ObjectSet *oset)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   // return # bytes not clean (and thus not released).
   loff_t unclean = 0;
 
@@ -2424,7 +2429,7 @@ loff_t ObjectCacher::release_set(ObjectSet *oset)
 
 uint64_t ObjectCacher::release_all()
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 10) << "release_all" << dendl;
   uint64_t unclean = 0;
 
@@ -2460,7 +2465,7 @@ uint64_t ObjectCacher::release_all()
 
 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
 
   for (xlist<Object*>::iterator p = oset->objects.begin();
@@ -2485,7 +2490,7 @@ void ObjectCacher::clear_nonexistence(ObjectSet *oset)
  */
 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   bool was_dirty = oset->dirty_or_tx > 0;
 
   _discard(oset, exls, nullptr);
@@ -2501,7 +2506,7 @@ void ObjectCacher::discard_writeback(ObjectSet *oset,
                                      const vector<ObjectExtent>& exls,
                                      Context* on_finish)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   bool was_dirty = oset->dirty_or_tx > 0;
 
   C_GatherBuilder gather(cct);
@@ -2509,9 +2514,9 @@ void ObjectCacher::discard_writeback(ObjectSet *oset,
 
   if (gather.has_subs()) {
     bool flushed = was_dirty && oset->dirty_or_tx == 0;
-    gather.set_finisher(new FunctionContext(
+    gather.set_finisher(new LambdaContext(
       [this, oset, flushed, on_finish](int) {
-       ceph_assert(lock.is_locked());
+       ceph_assert(ceph_mutex_is_locked(lock));
        if (flushed && flush_set_callback)
          flush_set_callback(flush_set_callback_arg, oset);
        if (on_finish)
@@ -2548,7 +2553,7 @@ void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
 void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
                                    Context* on_finish)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
 
   // did we truncate off dirty data?
   if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
@@ -2563,7 +2568,7 @@ void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
 
 void ObjectCacher::verify_stats() const
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 10) << "verify_stats" << dendl;
 
   loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
@@ -2624,7 +2629,7 @@ void ObjectCacher::verify_stats() const
 
 void ObjectCacher::bh_stat_add(BufferHead *bh)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   switch (bh->get_state()) {
   case BufferHead::STATE_MISSING:
     stat_missing += bh->length();
@@ -2655,12 +2660,12 @@ void ObjectCacher::bh_stat_add(BufferHead *bh)
     ceph_abort_msg("bh_stat_add: invalid bufferhead state");
   }
   if (get_stat_dirty_waiting() > 0)
-    stat_cond.Signal();
+    stat_cond.notify_all();
 }
 
 void ObjectCacher::bh_stat_sub(BufferHead *bh)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   switch (bh->get_state()) {
   case BufferHead::STATE_MISSING:
     stat_missing -= bh->length();
@@ -2694,7 +2699,7 @@ void ObjectCacher::bh_stat_sub(BufferHead *bh)
 
 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   int state = bh->get_state();
   // move between lru lists?
   if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
@@ -2733,7 +2738,7 @@ void ObjectCacher::bh_set_state(BufferHead *bh, int s)
 
 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
   ob->add_bh(bh);
   if (bh->is_dirty()) {
@@ -2754,7 +2759,7 @@ void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
 
 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
 {
-  ceph_assert(lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(lock));
   ceph_assert(bh->get_journal_tid() == 0);
   ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
   ob->remove_bh(bh);
@@ -2770,6 +2775,6 @@ void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
   }
   bh_stat_sub(bh);
   if (get_stat_dirty_waiting() > 0)
-    stat_cond.Signal();
+    stat_cond.notify_all();
 }