]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/ObjectCacher.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / osdc / ObjectCacher.h
index 31201a7235422df27ed375b6c26776721e43d310..2101692e14054311ae1b510139cfb19bfe09cad8 100644 (file)
@@ -7,6 +7,7 @@
 #include "include/lru.h"
 #include "include/Context.h"
 #include "include/xlist.h"
+#include "include/common_fwd.h"
 
 #include "common/Cond.h"
 #include "common/Finisher.h"
@@ -16,9 +17,7 @@
 #include "Objecter.h"
 #include "Striper.h"
 
-class CephContext;
 class WritebackHandler;
-class PerfCounters;
 
 enum {
   l_objectcacher_first = 25000,
@@ -62,34 +61,34 @@ class ObjectCacher {
 
   // read scatter/gather
   struct OSDRead {
-    vector<ObjectExtent> extents;
+    std::vector<ObjectExtent> extents;
     snapid_t snap;
-    bufferlist *bl;
+    ceph::buffer::list *bl;
     int fadvise_flags;
-    OSDRead(snapid_t s, bufferlist *b, int f)
+    OSDRead(snapid_t s, ceph::buffer::list *b, int f)
       : snap(s), bl(b), fadvise_flags(f) {}
   };
 
-  OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const {
+  OSDRead *prepare_read(snapid_t snap, ceph::buffer::list *b, int f) const {
     return new OSDRead(snap, b, f);
   }
 
   // write scatter/gather
   struct OSDWrite {
-    vector<ObjectExtent> extents;
+    std::vector<ObjectExtent> extents;
     SnapContext snapc;
-    bufferlist bl;
+    ceph::buffer::list bl;
     ceph::real_time mtime;
     int fadvise_flags;
     ceph_tid_t journal_tid;
-    OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
+    OSDWrite(const SnapContext& sc, const ceph::buffer::list& b, ceph::real_time mt,
             int f, ceph_tid_t _journal_tid)
       : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
        journal_tid(_journal_tid) {}
   };
 
   OSDWrite *prepare_write(const SnapContext& sc,
-                         const bufferlist &b,
+                         const ceph::buffer::list &b,
                          ceph::real_time mt,
                          int f,
                          ceph_tid_t journal_tid) const {
@@ -122,7 +121,7 @@ class ObjectCacher {
 
   public:
     Object *ob;
-    bufferlist  bl;
+    ceph::buffer::list  bl;
     ceph_tid_t last_write_tid;  // version of bh (if non-zero)
     ceph_tid_t last_read_tid;   // tid of last read op (if any)
     ceph::real_time last_write;
@@ -130,7 +129,7 @@ class ObjectCacher {
     ceph_tid_t journal_tid;
     int error; // holds return value for failed reads
 
-    map<loff_t, list<Context*> > waitfor_read;
+    std::map<loff_t, std::list<Context*> > waitfor_read;
 
     // cons
     explicit BufferHead(Object *o) :
@@ -162,6 +161,13 @@ class ObjectCacher {
     }
     int get_state() const { return state; }
 
+    inline int get_error() const {
+      return error;
+    }
+    inline void set_error(int _error) {
+      error = _error;
+    }
+
     inline ceph_tid_t get_journal_tid() const {
       return journal_tid;
     }
@@ -179,12 +185,12 @@ class ObjectCacher {
 
     // reference counting
     int get() {
-      assert(ref >= 0);
+      ceph_assert(ref >= 0);
       if (ref == 0) lru_pin();
       return ++ref;
     }
     int put() {
-      assert(ref > 0);
+      ceph_assert(ref > 0);
       if (ref == 1) lru_unpin();
       --ref;
       return ref;
@@ -244,14 +250,14 @@ class ObjectCacher {
     bool complete;
     bool exists;
 
-    map<loff_t, BufferHead*>     data;
+    std::map<loff_t, BufferHead*>     data;
 
     ceph_tid_t last_write_tid;  // version of bh (if non-zero)
-    ceph_tid_t last_commit_tid; // last update commited.
+    ceph_tid_t last_commit_tid; // last update committed.
 
     int dirty_or_tx;
 
-    map< ceph_tid_t, list<Context*> > waitfor_commit;
+    std::map< ceph_tid_t, std::list<Context*> > waitfor_commit;
     xlist<C_ReadFinish*> reads;
 
     Object(const Object&) = delete;
@@ -271,9 +277,9 @@ class ObjectCacher {
     }
     ~Object() {
       reads.clear();
-      assert(ref == 0);
-      assert(data.empty());
-      assert(dirty_or_tx == 0);
+      ceph_assert(ref == 0);
+      ceph_assert(data.empty());
+      ceph_assert(dirty_or_tx == 0);
       set_item.remove_myself();
     }
 
@@ -281,7 +287,7 @@ class ObjectCacher {
     object_t get_oid() { return oid.oid; }
     snapid_t get_snap() { return oid.snap; }
     ObjectSet *get_object_set() const { return oset; }
-    string get_namespace() { return oloc.nspace; }
+    std::string get_namespace() { return oloc.nspace; }
     uint64_t get_object_number() const { return object_no; }
 
     const object_locator_t& get_oloc() const { return oloc; }
@@ -289,8 +295,8 @@ class ObjectCacher {
 
     bool can_close() const {
       if (lru_is_expireable()) {
-       assert(data.empty());
-       assert(waitfor_commit.empty());
+       ceph_assert(data.empty());
+       ceph_assert(waitfor_commit.empty());
        return true;
       }
       return false;
@@ -310,8 +316,8 @@ class ObjectCacher {
      * @param offset object byte offset
      * @return iterator pointing to buffer, or data.end()
      */
-    map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
-      map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset);
+    std::map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
+      auto p = data.lower_bound(offset);
       if (p != data.begin() &&
          (p == data.end() || p->first > offset)) {
        --p;     // might overlap!
@@ -326,11 +332,11 @@ class ObjectCacher {
     void add_bh(BufferHead *bh) {
       if (data.empty())
        get();
-      assert(data.count(bh->start()) == 0);
+      ceph_assert(data.count(bh->start()) == 0);
       data[bh->start()] = bh;
     }
     void remove_bh(BufferHead *bh) {
-      assert(data.count(bh->start()));
+      ceph_assert(data.count(bh->start()));
       data.erase(bh->start());
       if (data.empty())
        put();
@@ -341,29 +347,31 @@ class ObjectCacher {
     // mid-level
     BufferHead *split(BufferHead *bh, loff_t off);
     void merge_left(BufferHead *left, BufferHead *right);
+    bool can_merge_bh(BufferHead *left, BufferHead *right);
     void try_merge_bh(BufferHead *bh);
+    void maybe_rebuild_buffer(BufferHead *bh);
 
     bool is_cached(loff_t off, loff_t len) const;
     bool include_all_cached_data(loff_t off, loff_t len);
     int map_read(ObjectExtent &ex,
-                 map<loff_t, BufferHead*>& hits,
-                 map<loff_t, BufferHead*>& missing,
-                 map<loff_t, BufferHead*>& rx,
-                map<loff_t, BufferHead*>& errors);
+                 std::map<loff_t, BufferHead*>& hits,
+                 std::map<loff_t, BufferHead*>& missing,
+                 std::map<loff_t, BufferHead*>& rx,
+                std::map<loff_t, BufferHead*>& errors);
     BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
 
     void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
     void truncate(loff_t s);
-    void discard(loff_t off, loff_t len);
+    void discard(loff_t off, loff_t len, C_GatherBuilder* commit_gather);
 
     // reference counting
     int get() {
-      assert(ref >= 0);
+      ceph_assert(ref >= 0);
       if (ref == 0) lru_pin();
       return ++ref;
     }
     int put() {
-      assert(ref > 0);
+      ceph_assert(ref > 0);
       if (ref == 1) lru_unpin();
       --ref;
       return ref;
@@ -397,8 +405,8 @@ class ObjectCacher {
   WritebackHandler& writeback_handler;
   bool scattered_write;
 
-  string name;
-  Mutex& lock;
+  std::string name;
+  ceph::mutex& lock;
 
   uint64_t max_dirty, target_dirty, max_size, max_objects;
   ceph::timespan max_dirty_age;
@@ -410,17 +418,17 @@ class ObjectCacher {
   void *flush_set_callback_arg;
 
   // indexed by pool_id
-  vector<ceph::unordered_map<sobject_t, Object*> > objects;
+  std::vector<ceph::unordered_map<sobject_t, Object*> > objects;
 
-  list<Context*> waitfor_read;
+  std::list<Context*> waitfor_read;
 
   ceph_tid_t last_read_tid;
 
-  set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
+  std::set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
   LRU   bh_lru_dirty, bh_lru_rest;
   LRU   ob_lru;
 
-  Cond flusher_cond;
+  ceph::condition_variable flusher_cond;
   bool flusher_stop;
   void flusher_entry();
   class FlusherThread : public Thread {
@@ -450,7 +458,7 @@ class ObjectCacher {
   void close_object(Object *ob);
 
   // bh stats
-  Cond  stat_cond;
+  ceph::condition_variable  stat_cond;
 
   loff_t stat_clean;
   loff_t stat_zero;
@@ -461,6 +469,8 @@ class ObjectCacher {
   loff_t stat_error;
   loff_t stat_dirty_waiting;   // bytes that writers are waiting on to write
 
+  size_t stat_nr_dirty_waiters;
+
   void verify_stats() const;
 
   void bh_stat_add(BufferHead *bh);
@@ -468,9 +478,10 @@ class ObjectCacher {
   loff_t get_stat_tx() const { return stat_tx; }
   loff_t get_stat_rx() const { return stat_rx; }
   loff_t get_stat_dirty() const { return stat_dirty; }
-  loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
   loff_t get_stat_clean() const { return stat_clean; }
   loff_t get_stat_zero() const { return stat_zero; }
+  loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
+  size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; }
 
   void touch_bh(BufferHead *bh) {
     if (bh->is_dirty())
@@ -525,7 +536,7 @@ class ObjectCacher {
   void bh_read(BufferHead *bh, int op_flags,
                const ZTracer::Trace &parent_trace);
   void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
-  void bh_write_scattered(list<BufferHead*>& blist);
+  void bh_write_scattered(std::list<BufferHead*>& blist);
   void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
                            int64_t *amount, int *max_count);
 
@@ -549,7 +560,7 @@ class ObjectCacher {
   void purge(Object *o);
 
   int64_t reads_outstanding;
-  Cond read_cond;
+  ceph::condition_variable read_cond;
 
   int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
             bool external_call, ZTracer::Trace *trace);
@@ -558,10 +569,10 @@ class ObjectCacher {
  public:
   void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
                      loff_t offset, uint64_t length,
-                     bufferlist &bl, int r,
+                     ceph::buffer::list &bl, int r,
                      bool trust_enoent);
   void bh_write_commit(int64_t poolid, sobject_t oid,
-                      vector<pair<loff_t, uint64_t> >& ranges,
+                      std::vector<std::pair<loff_t, uint64_t> >& ranges,
                       ceph_tid_t t, int r);
 
   class C_WriteCommit;
@@ -572,7 +583,7 @@ class ObjectCacher {
 
 
 
-  ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, Mutex& l,
+  ObjectCacher(CephContext *cct_, std::string name, WritebackHandler& wb, ceph::mutex& l,
               flush_set_callback_t flush_callback,
               void *flush_callback_arg,
               uint64_t max_bytes, uint64_t max_objects,
@@ -584,11 +595,11 @@ class ObjectCacher {
     flusher_thread.create("flusher");
   }
   void stop() {
-    assert(flusher_thread.is_started());
-    lock.Lock();  // hmm.. watch out for deadlock!
+    ceph_assert(flusher_thread.is_started());
+    lock.lock();  // hmm.. watch out for deadlock!
     flusher_stop = true;
-    flusher_cond.Signal();
-    lock.Unlock();
+    flusher_cond.notify_all();
+    lock.unlock();
     flusher_thread.join();
   }
 
@@ -606,23 +617,27 @@ class ObjectCacher {
            ZTracer::Trace *parent_trace = nullptr);
   int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
             ZTracer::Trace *parent_trace = nullptr);
-  bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
+  bool is_cached(ObjectSet *oset, std::vector<ObjectExtent>& extents,
                 snapid_t snapid);
 
 private:
   // write blocking
   int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
                       ZTracer::Trace *trace, Context *onfreespace);
-  void maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
+  void _maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
   bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
 
+  void _discard(ObjectSet *oset, const std::vector<ObjectExtent>& exls,
+                C_GatherBuilder* gather);
+  void _discard_finish(ObjectSet *oset, bool was_dirty, Context* on_finish);
+
 public:
   bool set_is_empty(ObjectSet *oset);
   bool set_is_cached(ObjectSet *oset);
   bool set_is_dirty_or_committing(ObjectSet *oset);
 
   bool flush_set(ObjectSet *oset, Context *onfinish=0);
-  bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
+  bool flush_set(ObjectSet *oset, std::vector<ObjectExtent>& ex,
                  ZTracer::Trace *trace, Context *onfinish = 0);
   bool flush_all(Context *onfinish = 0);
 
@@ -632,7 +647,9 @@ public:
   loff_t release_set(ObjectSet *oset);
   uint64_t release_all();
 
-  void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex);
+  void discard_set(ObjectSet *oset, const std::vector<ObjectExtent>& ex);
+  void discard_writeback(ObjectSet *oset, const std::vector<ObjectExtent>& ex,
+                         Context* on_finish);
 
   /**
    * Retry any in-flight reads that get -ENOENT instead of marking
@@ -654,7 +671,7 @@ public:
     max_size = v;
   }
   void set_max_dirty_age(double a) {
-    max_dirty_age = make_timespan(a);
+    max_dirty_age = ceph::make_timespan(a);
   }
   void set_max_objects(int64_t v) {
     max_objects = v;
@@ -666,14 +683,14 @@ public:
   /*** async+caching (non-blocking) file interface ***/
   int file_is_cached(ObjectSet *oset, file_layout_t *layout,
                     snapid_t snapid, loff_t offset, uint64_t len) {
-    vector<ObjectExtent> extents;
+    std::vector<ObjectExtent> extents;
     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
                             oset->truncate_size, extents);
     return is_cached(oset, extents, snapid);
   }
 
   int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid,
-               loff_t offset, uint64_t len, bufferlist *bl, int flags,
+               loff_t offset, uint64_t len, ceph::buffer::list *bl, int flags,
                Context *onfinish) {
     OSDRead *rd = prepare_read(snapid, bl, flags);
     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
@@ -683,17 +700,17 @@ public:
 
   int file_write(ObjectSet *oset, file_layout_t *layout,
                 const SnapContext& snapc, loff_t offset, uint64_t len,
-                bufferlist& bl, ceph::real_time mtime, int flags) {
+                ceph::buffer::list& bl, ceph::real_time mtime, int flags) {
     OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
                             oset->truncate_size, wr->extents);
-    return writex(wr, oset, NULL);
+    return writex(wr, oset, nullptr);
   }
 
   bool file_flush(ObjectSet *oset, file_layout_t *layout,
                  const SnapContext& snapc, loff_t offset, uint64_t len,
                  Context *onfinish) {
-    vector<ObjectExtent> extents;
+    std::vector<ObjectExtent> extents;
     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
                             oset->truncate_size, extents);
     ZTracer::Trace trace;
@@ -702,7 +719,8 @@ public:
 };
 
 
-inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
+inline std::ostream& operator<<(std::ostream &out,
+                               const ObjectCacher::BufferHead &bh)
 {
   out << "bh[ " << &bh << " "
       << bh.start() << "~" << bh.length()
@@ -722,11 +740,9 @@ inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
   if (bh.error) out << " error=" << bh.error;
   out << "]";
   out << " waiters = {";
-  for (map<loff_t, list<Context*> >::const_iterator it
-        = bh.waitfor_read.begin();
-       it != bh.waitfor_read.end(); ++it) {
+  for (auto it = bh.waitfor_read.begin(); it != bh.waitfor_read.end(); ++it) {
     out << " " << it->first << "->[";
-    for (list<Context*>::const_iterator lit = it->second.begin();
+    for (auto lit = it->second.begin();
         lit != it->second.end(); ++lit) {
         out << *lit << ", ";
     }
@@ -736,7 +752,8 @@ inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
   return out;
 }
 
-inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
+inline std::ostream& operator<<(std::ostream &out,
+                               const ObjectCacher::ObjectSet &os)
 {
   return out << "objectset[" << os.ino
             << " ts " << os.truncate_seq << "/" << os.truncate_size
@@ -745,10 +762,11 @@ inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
             << "]";
 }
 
-inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob)
+inline std::ostream& operator<<(std::ostream &out,
+                               const ObjectCacher::Object &ob)
 {
   out << "object["
-      << ob.get_soid() << " oset " << ob.oset << dec
+      << ob.get_soid() << " oset " << ob.oset << std::dec
       << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
 
   if (ob.complete)