]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/ObjectCacher.cc
update sources to v12.2.3
[ceph.git] / ceph / src / osdc / ObjectCacher.cc
index 9a0b6ee285192ba4fe4ca0780f0e729db47725dc..ff1bf87b17c9d8af463b41fd06d9d01e6b3e1332 100644 (file)
@@ -12,6 +12,7 @@
 #include "include/assert.h"
 
 #define MAX_FLUSH_UNDER_LOCK 20  ///< max bh's we start writeback on
+#define BUFFER_MEMORY_WEIGHT 12   // memory usage of BufferHead, count in (1<<n)
 
 using std::chrono::seconds;
                                 /// while holding the lock
@@ -36,19 +37,21 @@ class ObjectCacher::C_ReadFinish : public Context {
   xlist<C_ReadFinish*>::item set_item;
   bool trust_enoent;
   ceph_tid_t tid;
+  ZTracer::Trace trace;
 
 public:
   bufferlist bl;
   C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
-              uint64_t l) :
+              uint64_t l, const ZTracer::Trace &trace) :
     oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
     set_item(this), trust_enoent(true),
-    tid(t) {
+    tid(t), trace(trace) {
     ob->reads.push_back(&set_item);
   }
 
   void finish(int r) override {
     oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
+    trace.event("finish");
 
     // object destructor clears the list
     if (set_item.is_on_list())
@@ -65,18 +68,25 @@ class ObjectCacher::C_RetryRead : public Context {
   OSDRead *rd;
   ObjectSet *oset;
   Context *onfinish;
+  ZTracer::Trace trace;
 public:
-  C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c)
-    : oc(_oc), rd(r), oset(os), onfinish(c) {}
+  C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
+             const ZTracer::Trace &trace)
+    : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
+  }
   void finish(int r) override {
-    if (r < 0) {
-      if (onfinish)
-        onfinish->complete(r);
+    if (r >= 0) {
+      r = oc->_readx(rd, oset, onfinish, false, &trace);
+    }
+
+    if (r == 0) {
+      // read is still in-progress
       return;
     }
-    int ret = oc->_readx(rd, oset, onfinish, false);
-    if (ret != 0 && onfinish) {
-      onfinish->complete(ret);
+
+    trace.event("finish");
+    if (onfinish) {
+      onfinish->complete(r);
     }
   }
 };
@@ -147,9 +157,6 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
 {
   assert(oc->lock.is_locked());
-  assert(left->end() == right->start());
-  assert(left->get_state() == right->get_state());
-  assert(left->can_merge_journal(right));
 
   ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
   if (left->get_journal_tid() == 0) {
@@ -186,6 +193,17 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
   ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
 }
 
+bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
+{
+  if (left->end() != right->start() ||
+      left->get_state() != right->get_state() ||
+      !left->can_merge_journal(right))
+    return false;
+  if (left->is_tx() && left->last_write_tid != right->last_write_tid)
+    return false;
+  return true;
+}
+
 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
 {
   assert(oc->lock.is_locked());
@@ -200,9 +218,7 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
   assert(p->second == bh);
   if (p != data.begin()) {
     --p;
-    if (p->second->end() == bh->start() &&
-       p->second->get_state() == bh->get_state() &&
-       p->second->can_merge_journal(bh)) {
+    if (can_merge_bh(p->second, bh)) {
       merge_left(p->second, bh);
       bh = p->second;
     } else {
@@ -212,10 +228,7 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
   // to the right?
   assert(p->second == bh);
   ++p;
-  if (p != data.end() &&
-      p->second->start() == bh->end() &&
-      p->second->get_state() == bh->get_state() &&
-      p->second->can_merge_journal(bh))
+  if (p != data.end() && can_merge_bh(bh, p->second))
     merge_left(bh, p->second);
 }
 
@@ -274,10 +287,9 @@ int ObjectCacher::Object::map_read(ObjectExtent &ex,
                                   map<loff_t, BufferHead*>& errors)
 {
   assert(oc->lock.is_locked());
-  ldout(oc->cct, 10) << "map_read " << ex.oid 
-              << " " << ex.offset << "~" << ex.length
-              << dendl;
-  
+  ldout(oc->cct, 10) << "map_read " << ex.oid << " "
+                     << ex.offset << "~" << ex.length << dendl;
+
   loff_t cur = ex.offset;
   loff_t left = ex.length;
 
@@ -302,7 +314,7 @@ int ObjectCacher::Object::map_read(ObjectExtent &ex,
       assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
       break;  // no more.
     }
-    
+
     if (p->first <= cur) {
       // have it (or part of it)
       BufferHead *e = p->second;
@@ -322,13 +334,13 @@ int ObjectCacher::Object::map_read(ObjectExtent &ex,
       } else {
         ceph_abort();
       }
-      
+
       loff_t lenfromcur = MIN(e->end() - cur, left);
       cur += lenfromcur;
       left -= lenfromcur;
       ++p;
       continue;  // more?
-      
+
     } else if (p->first > cur) {
       // gap.. miss
       loff_t next = p->first;
@@ -396,7 +408,7 @@ void ObjectCacher::Object::audit_buffers()
  * other dirty data to left and/or right.
  */
 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
-    ceph_tid_t tid)
+                                                         ceph_tid_t tid)
 {
   assert(oc->lock.is_locked());
   BufferHead *final = 0;
@@ -612,11 +624,13 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name,
     max_size(max_bytes), max_objects(max_objects),
     max_dirty_age(ceph::make_timespan(max_dirty_age)),
     block_writes_upfront(block_writes_upfront),
+    trace_endpoint("ObjectCacher"),
     flush_set_callback(flush_callback),
     flush_set_callback_arg(flush_callback_arg),
     last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
     stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
-    stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
+    stat_missing(0), stat_error(0), stat_dirty_waiting(0),
+    stat_nr_dirty_waiters(0), reads_outstanding(0)
 {
   perf_start();
   finisher.start();
@@ -724,24 +738,32 @@ void ObjectCacher::close_object(Object *ob)
   delete ob;
 }
 
-void ObjectCacher::bh_read(BufferHead *bh, int op_flags)
+void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
+                           const ZTracer::Trace &parent_trace)
 {
   assert(lock.is_locked());
   ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
                << reads_outstanding << dendl;
 
+  ZTracer::Trace trace;
+  if (parent_trace.valid()) {
+    trace.init("", &trace_endpoint, &parent_trace);
+    trace.copy_name("bh_read " + bh->ob->get_oid().name);
+    trace.event("start");
+  }
+
   mark_rx(bh);
   bh->last_read_tid = ++last_read_tid;
 
   // finisher
   C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
-                                           bh->start(), bh->length());
+                                           bh->start(), bh->length(), trace);
   // go
   writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
                         bh->ob->get_oloc(), bh->start(), bh->length(),
                         bh->ob->get_snap(), &onfinish->bl,
                         bh->ob->truncate_size, bh->ob->truncate_seq,
-                        op_flags, onfinish);
+                        op_flags, trace, onfinish);
 
   ++reads_outstanding;
 }
@@ -979,11 +1001,12 @@ class ObjectCacher::C_WriteCommit : public Context {
   int64_t poolid;
   sobject_t oid;
   vector<pair<loff_t, uint64_t> > ranges;
+  ZTracer::Trace trace;
 public:
-  ceph_tid_t tid;
+  ceph_tid_t tid = 0;
   C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
-               uint64_t l) :
-    oc(c), poolid(_poolid), oid(o), tid(0) {
+               uint64_t l, const ZTracer::Trace &trace) :
+    oc(c), poolid(_poolid), oid(o), trace(trace) {
       ranges.push_back(make_pair(s, l));
     }
   C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
@@ -993,6 +1016,7 @@ public:
     }
   void finish(int r) override {
     oc->bh_write_commit(poolid, oid, ranges, tid, r);
+    trace.event("finish");
   }
 };
 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
@@ -1048,17 +1072,24 @@ void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
     perfcounter->inc(l_objectcacher_data_flushed, total_len);
 }
 
-void ObjectCacher::bh_write(BufferHead *bh)
+void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
 {
   assert(lock.is_locked());
   ldout(cct, 7) << "bh_write " << *bh << dendl;
 
   bh->ob->get();
 
+  ZTracer::Trace trace;
+  if (parent_trace.valid()) {
+    trace.init("", &trace_endpoint, &parent_trace);
+    trace.copy_name("bh_write " + bh->ob->get_oid().name);
+    trace.event("start");
+  }
+
   // finishers
   C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
                                              bh->ob->get_soid(), bh->start(),
-                                             bh->length());
+                                             bh->length(), trace);
   // go
   ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
                                           bh->ob->get_oloc(),
@@ -1066,7 +1097,7 @@ void ObjectCacher::bh_write(BufferHead *bh)
                                           bh->snapc, bh->bl, bh->last_write,
                                           bh->ob->truncate_size,
                                           bh->ob->truncate_seq,
-                                          bh->journal_tid, oncommit);
+                                          bh->journal_tid, trace, oncommit);
   ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
 
   // set bh last_write_tid
@@ -1121,15 +1152,9 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
         ++p) {
       BufferHead *bh = p->second;
 
-      if (bh->start() > start+(loff_t)length)
+      if (bh->start() >= start+(loff_t)length)
        break;
 
-      if (bh->start() < start &&
-         bh->end() > start+(loff_t)length) {
-       ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
-       continue;
-      }
-
       // make sure bh is tx
       if (!bh->is_tx()) {
        ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
@@ -1143,6 +1168,10 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
        continue;
       }
 
+      // we don't merge tx buffers. tx buffer should be within the range
+      assert(bh->start() >= start);
+      assert(bh->end() <= start+(loff_t)length);
+
       if (r >= 0) {
        // ok!  mark bh clean and error-free
        mark_clean(bh);
@@ -1191,8 +1220,9 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
     finish_contexts(cct, ls, r);
 }
 
-void ObjectCacher::flush(loff_t amount)
+void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
 {
+  assert(trace != nullptr);
   assert(lock.is_locked());
   ceph::real_time cutoff = ceph::real_clock::now();
 
@@ -1215,9 +1245,9 @@ void ObjectCacher::flush(loff_t amount)
       bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
     } else {
       left -= bh->length();
-      bh_write(bh);
+      bh_write(bh, *trace);
     }
-  }    
+  }
 }
 
 
@@ -1228,7 +1258,11 @@ void ObjectCacher::trim()
                 << get_stat_clean() << ", objects: max " << max_objects
                 << " current " << ob_lru.lru_get_size() << dendl;
 
-  while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size) {
+  uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
+  uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
+  while (get_stat_clean() > 0 &&
+        ((uint64_t)get_stat_clean() > max_size ||
+         nr_clean_bh > max_clean_bh)) {
     BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
     if (!bh)
       break;
@@ -1240,6 +1274,8 @@ void ObjectCacher::trim()
     bh_remove(ob, bh);
     delete bh;
 
+    --nr_clean_bh;
+
     if (ob->complete) {
       ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
       ob->complete = false;
@@ -1290,14 +1326,26 @@ bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
  *           must delete it)
  * returns 0 if doing async read
  */
-int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+                       ZTracer::Trace *parent_trace)
 {
-  return _readx(rd, oset, onfinish, true);
+  ZTracer::Trace trace;
+  if (parent_trace != nullptr) {
+    trace.init("read", &trace_endpoint, parent_trace);
+    trace.event("start");
+  }
+
+  int r =_readx(rd, oset, onfinish, true, &trace);
+  if (r < 0) {
+    trace.event("finish");
+  }
+  return r;
 }
 
 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
-                        bool external_call)
+                        bool external_call, ZTracer::Trace *trace)
 {
+  assert(trace != nullptr);
   assert(lock.is_locked());
   bool success = true;
   int error = 0;
@@ -1350,7 +1398,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
              if (scattered_write)
                blist.push_back(bh);
              else
-               bh_write(bh);
+               bh_write(bh, *trace);
            }
          }
        }
@@ -1360,7 +1408,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
          ldout(cct, 10) << "readx  waiting on tid " << o->last_write_tid
                         << " on " << *o << dendl;
          o->waitfor_commit[o->last_write_tid].push_back(
-           new C_RetryRead(this,rd, oset, onfinish));
+           new C_RetryRead(this,rd, oset, onfinish, *trace));
          // FIXME: perfcounter!
          return 0;
        }
@@ -1417,14 +1465,15 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
                           << waitfor_read.size() << " blocked reads, "
                           << (MAX(rx_bytes, max_size) - max_size)
                           << " read bytes" << dendl;
-           waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+           waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
+                                                  *trace));
          }
 
          bh_remove(o, bh_it->second);
          delete bh_it->second;
        } else {
          bh_it->second->set_nocache(nocache);
-         bh_read(bh_it->second, rd->fadvise_flags);
+         bh_read(bh_it->second, rd->fadvise_flags, *trace);
          if ((success && onfinish) || last != missing.end())
            last = bh_it;
        }
@@ -1436,7 +1485,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
        ldout(cct, 10) << "readx missed, waiting on " << *last->second
          << " off " << last->first << dendl;
        last->second->waitfor_read[last->first].push_back(
-         new C_RetryRead(this, rd, oset, onfinish) );
+         new C_RetryRead(this, rd, oset, onfinish, *trace) );
 
       }
 
@@ -1449,7 +1498,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
          ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
                         << " off " << bh_it->first << dendl;
          bh_it->second->waitfor_read[bh_it->first].push_back(
-           new C_RetryRead(this, rd, oset, onfinish) );
+           new C_RetryRead(this, rd, oset, onfinish, *trace) );
        }
        bytes_not_in_cache += bh_it->second->length();
        success = false;
@@ -1616,7 +1665,8 @@ void ObjectCacher::retry_waiting_reads()
   waitfor_read.splice(waitfor_read.end(), ls);
 }
 
-int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+                        ZTracer::Trace *parent_trace)
 {
   assert(lock.is_locked());
   ceph::real_time now = ceph::real_clock::now();
@@ -1625,6 +1675,12 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
   bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
   bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
 
+  ZTracer::Trace trace;
+  if (parent_trace != nullptr) {
+    trace.init("write", &trace_endpoint, parent_trace);
+    trace.event("start");
+  }
+
   for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
        ex_it != wr->extents.end();
        ++ex_it) {
@@ -1637,7 +1693,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
     BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
     bool missing = bh->is_missing();
     bh->snapc = wr->snapc;
-    
+
     bytes_written += ex_it->length;
     if (bh->is_tx()) {
       bytes_written_in_flush += ex_it->length;
@@ -1696,7 +1752,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
     }
   }
 
-  int r = _wait_for_write(wr, bytes_written, oset, onfreespace);
+  int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
   delete wr;
 
   //verify_stats();
@@ -1706,23 +1762,26 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
 
 class ObjectCacher::C_WaitForWrite : public Context {
 public:
-  C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
-    m_oc(oc), m_len(len), m_onfinish(onfinish) {}
+  C_WaitForWrite(ObjectCacher *oc, uint64_t len,
+                 const ZTracer::Trace &trace, Context *onfinish) :
+    m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
   void finish(int r) override;
 private:
   ObjectCacher *m_oc;
   uint64_t m_len;
+  ZTracer::Trace m_trace;
   Context *m_onfinish;
 };
 
 void ObjectCacher::C_WaitForWrite::finish(int r)
 {
   Mutex::Locker l(m_oc->lock);
-  m_oc->maybe_wait_for_writeback(m_len);
+  m_oc->maybe_wait_for_writeback(m_len, &m_trace);
   m_onfinish->complete(r);
 }
 
-void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
+void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
+                                            ZTracer::Trace *trace)
 {
   assert(lock.is_locked());
   ceph::mono_time start = ceph::mono_clock::now();
@@ -1732,20 +1791,33 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
   //  - do not wait for bytes other waiters are waiting on.  this means that
   //    threads do not wait for each other.  this effectively allows the cache
   //    size to balloon proportional to the data that is in flight.
+
+  uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
   while (get_stat_dirty() + get_stat_tx() > 0 &&
-        (uint64_t) (get_stat_dirty() + get_stat_tx()) >=
-        max_dirty + get_stat_dirty_waiting()) {
+        (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
+         max_dirty + get_stat_dirty_waiting()) ||
+        (dirty_or_tx_bh.size() >=
+         max_dirty_bh + get_stat_nr_dirty_waiters()))) {
+
+    if (blocked == 0) {
+      trace->event("start wait for writeback");
+    }
     ldout(cct, 10) << __func__ << " waiting for dirty|tx "
                   << (get_stat_dirty() + get_stat_tx()) << " >= max "
                   << max_dirty << " + dirty_waiting "
                   << get_stat_dirty_waiting() << dendl;
     flusher_cond.Signal();
     stat_dirty_waiting += len;
+    ++stat_nr_dirty_waiters;
     stat_cond.Wait(lock);
     stat_dirty_waiting -= len;
+    --stat_nr_dirty_waiters;
     ++blocked;
     ldout(cct, 10) << __func__ << " woke up" << dendl;
   }
+  if (blocked > 0) {
+    trace->event("finish wait for writeback");
+  }
   if (blocked && perfcounter) {
     perfcounter->inc(l_objectcacher_write_ops_blocked);
     perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
@@ -1756,19 +1828,20 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
 
 // blocking wait for write.
 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
-                                 Context *onfreespace)
+                                 ZTracer::Trace *trace, Context *onfreespace)
 {
   assert(lock.is_locked());
+  assert(trace != nullptr);
   int ret = 0;
 
   if (max_dirty > 0) {
     if (block_writes_upfront) {
-      maybe_wait_for_writeback(len);
+      maybe_wait_for_writeback(len, trace);
       if (onfreespace)
        onfreespace->complete(0);
     } else {
       assert(onfreespace);
-      finisher.queue(new C_WaitForWrite(this, len, onfreespace));
+      finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
     }
   } else {
     // write-thru!  flush what we just wrote.
@@ -1777,7 +1850,7 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
     Context *fin = block_writes_upfront ?
       new C_Cond(&cond, &done, &ret) : onfreespace;
     assert(fin);
-    bool flushed = flush_set(oset, wr->extents, fin);
+    bool flushed = flush_set(oset, wr->extents, trace, fin);
     assert(!flushed);   // we just dirtied it, and didn't drop our lock!
     ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
                   << " bytes" << dendl;
@@ -1816,12 +1889,19 @@ void ObjectCacher::flusher_entry()
                   << max_dirty << " max)"
                   << dendl;
     loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
+
+    ZTracer::Trace trace;
+    if (cct->_conf->osdc_blkin_trace_all) {
+      trace.init("flusher", &trace_endpoint);
+      trace.event("start");
+    }
+
     if (actual > 0 && (uint64_t) actual > target_dirty) {
       // flush some dirty pages
       ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
                     << get_stat_dirty_waiting() << " dirty_waiting > target "
                     << target_dirty << ", flushing some dirty bhs" << dendl;
-      flush(actual - target_dirty);
+      flush(&trace, actual - target_dirty);
     } else {
       // check tail of lru for old dirty items
       ceph::real_time cutoff = ceph::real_clock::now();
@@ -1836,17 +1916,20 @@ void ObjectCacher::flusher_entry()
        if (scattered_write) {
          bh_write_adjacencies(bh, cutoff, NULL, &max);
         } else {
-         bh_write(bh);
+         bh_write(bh, trace);
          --max;
        }
       }
       if (!max) {
        // back off the lock to avoid starving other threads
+        trace.event("backoff");
        lock.Unlock();
        lock.Lock();
        continue;
       }
     }
+
+    trace.event("finish");
     if (flusher_stop)
       break;
 
@@ -1943,8 +2026,10 @@ void ObjectCacher::purge(Object *ob)
 // true if clean, already flushed.
 // false if we wrote something.
 // be sloppy about the ranges and flush any buffer it touches
-bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
+                         ZTracer::Trace *trace)
 {
+  assert(trace != nullptr);
   assert(lock.is_locked());
   list<BufferHead*> blist;
   bool clean = true;
@@ -1968,7 +2053,7 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
     if (scattered_write)
       blist.push_back(bh);
     else
-      bh_write(bh);
+      bh_write(bh, *trace);
     clean = false;
   }
   if (scattered_write && !blist.empty())
@@ -2044,7 +2129,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
        }
        blist.push_back(bh);
       } else {
-       bh_write(bh);
+       bh_write(bh, {});
       }
     }
   }
@@ -2070,7 +2155,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
          }
          blist.push_front(bh);
        } else {
-         bh_write(bh);
+         bh_write(bh, {});
        }
       }
       if (!backwards)
@@ -2097,9 +2182,10 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
 // flush.  non-blocking, takes callback.
 // returns true if already flushed
 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
-                            Context *onfinish)
+                            ZTracer::Trace *trace, Context *onfinish)
 {
   assert(lock.is_locked());
+  assert(trace != nullptr);
   assert(onfinish != NULL);
   if (oset->objects.empty()) {
     ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
@@ -2125,7 +2211,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
     ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
                   << " " << ob << dendl;
 
-    if (!flush(ob, ex.offset, ex.length)) {
+    if (!flush(ob, ex.offset, ex.length, trace)) {
       // we'll need to gather...
       ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
                     << ob->last_write_tid << " on " << *ob << dendl;
@@ -2169,7 +2255,7 @@ bool ObjectCacher::flush_all(Context *onfinish)
        }
        blist.push_back(bh);
       } else {
-       bh_write(bh);
+       bh_write(bh, {});
       }
     }