]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/buffer.cc
update sources to v12.1.0
[ceph.git] / ceph / src / common / buffer.cc
index bf4b0af25d067e581ecafaa0e74b1cd29d0cb686..44ce5b016ece4fc5f8b5240a5ddf4bba3a85f955 100644 (file)
  * 
  */
 
+#include <atomic>
+#include <errno.h>
+#include <limits.h>
+
+#include <sys/uio.h>
+
 #include "include/compat.h"
 #include "include/mempool.h"
 #include "armor.h"
 #include "common/likely.h"
 #include "common/valgrind.h"
 #include "common/deleter.h"
-#include "include/atomic.h"
 #include "common/RWLock.h"
 #include "include/types.h"
-#include "include/compat.h"
-#include "include/inline_memory.h"
 #include "include/scope_guard.h"
+
 #if defined(HAVE_XIO)
 #include "msg/xio/XioMsg.h"
 #endif
 
-#include <errno.h>
-#include <fstream>
-#include <sstream>
-#include <sys/uio.h>
-#include <limits.h>
-
-#include <atomic>
-#include <ostream>
+using namespace ceph;
 
 #define CEPH_BUFFER_ALLOC_UNIT  (MIN(CEPH_PAGE_SIZE, 4096))
 #define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))
@@ -54,71 +51,73 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 # define bendl std::endl; }
 #endif
 
-  static atomic_t buffer_total_alloc;
-  static atomic64_t buffer_history_alloc_bytes;
-  static atomic64_t buffer_history_alloc_num;
+  static std::atomic<uint64_t> buffer_total_alloc { 0 };
+  static std::atomic<uint64_t> buffer_history_alloc_bytes { 0 };
+  static std::atomic<uint64_t> buffer_history_alloc_num { 0 };
+
   const bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK");
 
   namespace {
   void inc_total_alloc(unsigned len) {
     if (buffer_track_alloc)
-      buffer_total_alloc.add(len);
+      buffer_total_alloc += len;
   }
 
   void dec_total_alloc(unsigned len) {
     if (buffer_track_alloc)
-      buffer_total_alloc.sub(len);
+      buffer_total_alloc -= len;
   }
 
   void inc_history_alloc(uint64_t len) {
     if (buffer_track_alloc) {
-      buffer_history_alloc_bytes.add(len);
-      buffer_history_alloc_num.inc();
+      buffer_history_alloc_bytes += len;
+      buffer_history_alloc_num++;
     }
   }
-  }
-
+  } // namespace
 
   int buffer::get_total_alloc() {
-    return buffer_total_alloc.read();
+    return buffer_total_alloc;
   }
   uint64_t buffer::get_history_alloc_bytes() {
-    return buffer_history_alloc_bytes.read();
+    return buffer_history_alloc_bytes;
   }
   uint64_t buffer::get_history_alloc_num() {
-    return buffer_history_alloc_num.read();
+    return buffer_history_alloc_num;
   }
 
-  static atomic_t buffer_cached_crc;
-  static atomic_t buffer_cached_crc_adjusted;
-  static atomic_t buffer_missed_crc;
+  static std::atomic<unsigned> buffer_cached_crc { 0 };
+  static std::atomic<unsigned> buffer_cached_crc_adjusted { 0 };
+  static std::atomic<unsigned> buffer_missed_crc { 0 };
+
   static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK");
 
   void buffer::track_cached_crc(bool b) {
     buffer_track_crc = b;
   }
   int buffer::get_cached_crc() {
-    return buffer_cached_crc.read();
+    return buffer_cached_crc;
   }
   int buffer::get_cached_crc_adjusted() {
-    return buffer_cached_crc_adjusted.read();
+    return buffer_cached_crc_adjusted;
   }
 
   int buffer::get_missed_crc() {
-    return buffer_missed_crc.read();
+    return buffer_missed_crc;
   }
 
-  static atomic_t buffer_c_str_accesses;
+  static std::atomic<unsigned> buffer_c_str_accesses { 0 };
+
   static bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK");
 
   void buffer::track_c_str(bool b) {
     buffer_track_c_str = b;
   }
   int buffer::get_c_str_accesses() {
-    return buffer_c_str_accesses.read();
+    return buffer_c_str_accesses;
   }
 
-  static atomic_t buffer_max_pipe_size;
+  static std::atomic<unsigned> buffer_max_pipe_size { 0 };
   int update_max_pipe_size() {
 #ifdef CEPH_HAVE_SETPIPE_SZ
     char buf[32];
@@ -135,18 +134,18 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     size_t size = strict_strtol(buf, 10, &err);
     if (!err.empty())
       return -EIO;
-    buffer_max_pipe_size.set(size);
+    buffer_max_pipe_size = size;
 #endif
     return 0;
   }
 
   size_t get_max_pipe_size() {
 #ifdef CEPH_HAVE_SETPIPE_SZ
-    size_t size = buffer_max_pipe_size.read();
+    size_t size = buffer_max_pipe_size;
     if (size)
       return size;
     if (update_max_pipe_size() == 0)
-      return buffer_max_pipe_size.read();
+      return buffer_max_pipe_size;
 #endif
     // this is the max size hardcoded in linux before 2.6.35
     return 65536;
@@ -171,18 +170,47 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   public:
     char *data;
     unsigned len;
-    atomic_t nref;
+    std::atomic<unsigned> nref { 0 };
+    int mempool = mempool::mempool_buffer_anon;
 
     mutable std::atomic_flag crc_spinlock = ATOMIC_FLAG_INIT;
     map<pair<size_t, size_t>, pair<uint32_t, uint32_t> > crc_map;
 
     explicit raw(unsigned l)
-      : data(NULL), len(l), nref(0)
-    { }
+      : data(NULL), len(l), nref(0) {
+      mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
+    }
     raw(char *c, unsigned l)
-      : data(c), len(l), nref(0)
-    { }
-    virtual ~raw() {}
+      : data(c), len(l), nref(0) {
+      mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
+    }
+    virtual ~raw() {
+      mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
+       -1, -(int)len);
+    }
+
+    void _set_len(unsigned l) {
+      mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
+       -1, -(int)len);
+      len = l;
+      mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
+    }
+
+    void reassign_to_mempool(int pool) {
+      if (pool == mempool) {
+       return;
+      }
+      mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
+       -1, -(int)len);
+      mempool = pool;
+      mempool::get_pool(mempool::pool_index_t(pool)).adjust_count(1, len);
+    }
+
+    void try_assign_to_mempool(int pool) {
+      if (mempool == mempool::mempool_buffer_anon) {
+       reassign_to_mempool(pool);
+      }
+    }
 
     // no copying.
     // cppcheck-suppress noExplicitConstructor
@@ -244,8 +272,6 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     }
   };
 
-  MEMPOOL_DEFINE_FACTORY(char, char, buffer_data);
-
   /*
    * raw_combined is always placed within a single allocation along
    * with the data buffer.  the data goes at the beginning, and
@@ -274,8 +300,14 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
                                  alignof(buffer::raw_combined));
       size_t datalen = ROUND_UP_TO(len, alignof(buffer::raw_combined));
 
-      char *ptr = mempool::buffer_data::alloc_char.allocate_aligned(
-       rawlen + datalen, align);
+#ifdef DARWIN
+      char *ptr = (char *) valloc(rawlen + datalen);
+#else
+      char *ptr = 0;
+      int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
+      if (r)
+       throw bad_alloc();
+#endif /* DARWIN */
       if (!ptr)
        throw bad_alloc();
 
@@ -286,11 +318,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 
     static void operator delete(void *ptr) {
       raw_combined *raw = (raw_combined *)ptr;
-      size_t rawlen = ROUND_UP_TO(sizeof(buffer::raw_combined),
-                                 alignof(buffer::raw_combined));
-      size_t datalen = ROUND_UP_TO(raw->len, alignof(buffer::raw_combined));
-      mempool::buffer_data::alloc_char.deallocate_aligned(
-       raw->data, rawlen + datalen);
+      ::free((void *)raw->data);
     }
   };
 
@@ -355,7 +383,13 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
       align = _align;
       assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
-      data = mempool::buffer_data::alloc_char.allocate_aligned(len, align);
+#ifdef DARWIN
+      data = (char *) valloc(len);
+#else
+      int r = ::posix_memalign((void**)(void*)&data, align, len);
+      if (r)
+       throw bad_alloc();
+#endif /* DARWIN */
       if (!data)
        throw bad_alloc();
       inc_total_alloc(len);
@@ -363,7 +397,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
       bdout << "raw_posix_aligned " << this << " alloc " << (void *)data << " l=" << l << ", align=" << align << " total_alloc=" << buffer::get_total_alloc() << bendl;
     }
     ~raw_posix_aligned() override {
-      mempool::buffer_data::alloc_char.deallocate_aligned(data, len);
+      ::free(data);
       dec_total_alloc(len);
       bdout << "raw_posix_aligned " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
     }
@@ -466,7 +500,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
        return r;
       }
       // update length with actual amount read
-      len = r;
+      _set_len(r);
       return 0;
     }
 
@@ -589,7 +623,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 
     explicit raw_char(unsigned l) : raw(l) {
       if (len)
-       data = mempool::buffer_data::alloc_char.allocate(len);
+       data = new char[len];
       else
        data = 0;
       inc_total_alloc(len);
@@ -601,8 +635,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
       bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
     }
     ~raw_char() override {
-      if (data)
-       mempool::buffer_data::alloc_char.deallocate(data, len);
+      delete[] data;
       dec_total_alloc(len);
       bdout << "raw_char " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
     }
@@ -611,6 +644,25 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     }
   };
 
+  class buffer::raw_claimed_char : public buffer::raw {
+  public:
+    MEMPOOL_CLASS_HELPERS();
+
+    explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
+      inc_total_alloc(len);
+      bdout << "raw_claimed_char " << this << " alloc " << (void *)data
+           << " " << l << " " << buffer::get_total_alloc() << bendl;
+    }
+    ~raw_claimed_char() override {
+      dec_total_alloc(len);
+      bdout << "raw_claimed_char " << this << " free " << (void *)data
+           << " " << buffer::get_total_alloc() << bendl;
+    }
+    raw* clone_empty() override {
+      return new raw_char(len);
+    }
+  };
+
   class buffer::raw_unshareable : public buffer::raw {
   public:
     MEMPOOL_CLASS_HELPERS();
@@ -719,7 +771,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     return buffer::create_aligned(len, sizeof(size_t));
   }
   buffer::raw* buffer::claim_char(unsigned len, char *buf) {
-    return new raw_char(len, buf);
+    return new raw_claimed_char(len, buf);
   }
   buffer::raw* buffer::create_malloc(unsigned len) {
     return new raw_malloc(len);
@@ -779,25 +831,25 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 
   buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len)   // no lock needed; this is an unref raw.
   {
-    r->nref.inc();
+    r->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
   {
     _raw = create(l);
-    _raw->nref.inc();
+    _raw->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l)    // ditto.
   {
     _raw = copy(d, l);
-    _raw->nref.inc();
+    _raw->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len)
   {
     if (_raw) {
-      _raw->nref.inc();
+      _raw->nref++;
       bdout << "ptr " << this << " get " << _raw << bendl;
     }
   }
@@ -811,13 +863,13 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   {
     assert(o+l <= p._len);
     assert(_raw);
-    _raw->nref.inc();
+    _raw->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr& buffer::ptr::operator= (const ptr& p)
   {
     if (p._raw) {
-      p._raw->nref.inc();
+      p._raw->nref++;
       bdout << "ptr " << this << " get " << _raw << bendl;
     }
     buffer::raw *raw = p._raw; 
@@ -856,8 +908,8 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     if (_raw && !_raw->is_shareable()) {
       buffer::raw *tr = _raw;
       _raw = tr->clone();
-      _raw->nref.set(1);
-      if (unlikely(tr->nref.dec() == 0)) {
+      _raw->nref = 1;
+      if (unlikely(--tr->nref == 0)) {
         ANNOTATE_HAPPENS_AFTER(&tr->nref);
         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&tr->nref);
         delete tr;
@@ -885,7 +937,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   {
     if (_raw) {
       bdout << "ptr " << this << " release " << _raw << bendl;
-      if (_raw->nref.dec() == 0) {
+      if (--_raw->nref == 0) {
        //cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
         ANNOTATE_HAPPENS_AFTER(&_raw->nref);
         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
@@ -902,25 +954,25 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   const char *buffer::ptr::c_str() const {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off;
   }
   char *buffer::ptr::c_str() {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off;
   }
   const char *buffer::ptr::end_c_str() const {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off + _len;
   }
   char *buffer::ptr::end_c_str() {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off + _len;
   }
 
@@ -946,7 +998,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 
   const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; }
   unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; }
-  int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref.read(); }
+  int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref; }
 
   void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
     assert(_raw);
@@ -956,9 +1008,8 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     maybe_inline_memcpy(dest, src, l, 8);
   }
 
-  unsigned buffer::ptr::wasted()
+  unsigned buffer::ptr::wasted() const
   {
-    assert(_raw);
     return _raw->len - _len;
   }
 
@@ -1441,6 +1492,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   {
     std::swap(_len, other._len);
     std::swap(_memcopy_count, other._memcopy_count);
+    std::swap(_mempool, other._mempool);
     _buffers.swap(other._buffers);
     append_buffer.swap(other.append_buffer);
     //last_p.swap(other.last_p);
@@ -1613,6 +1665,28 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     return is_aligned(CEPH_PAGE_SIZE);
   }
 
+  void buffer::list::reassign_to_mempool(int pool)
+  {
+    _mempool = pool;
+    if (append_buffer.get_raw()) {
+      append_buffer.get_raw()->reassign_to_mempool(pool);
+    }
+    for (auto& p : _buffers) {
+      p.get_raw()->reassign_to_mempool(pool);
+    }
+  }
+
+  void buffer::list::try_assign_to_mempool(int pool)
+  {
+    _mempool = pool;
+    if (append_buffer.get_raw()) {
+      append_buffer.get_raw()->try_assign_to_mempool(pool);
+    }
+    for (auto& p : _buffers) {
+      p.get_raw()->try_assign_to_mempool(pool);
+    }
+  }
+
   void buffer::list::rebuild()
   {
     if (_len == 0) {
@@ -1700,6 +1774,17 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
    return  rebuild_aligned(CEPH_PAGE_SIZE);
   }
 
+  void buffer::list::reserve(size_t prealloc)
+  {
+    if (append_buffer.unused_tail_length() < prealloc) {
+      append_buffer = buffer::create(prealloc);
+      if (_mempool >= 0) {
+       append_buffer.get_raw()->reassign_to_mempool(_mempool);
+      }
+      append_buffer.set_length(0);   // unused, so far.
+    }
+  }
+
   // sort-of-like-assignment-op
   void buffer::list::claim(list& bl, unsigned int flags)
   {
@@ -1730,6 +1815,16 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     bl.last_p = bl.begin();
   }
 
+  void buffer::list::claim_append_piecewise(list& bl)
+  {
+    // steal the other guy's buffers
+    for (std::list<buffer::ptr>::const_iterator i = bl.buffers().begin();
+        i != bl.buffers().end(); i++) {
+      append(*i, 0, i->length());
+    }
+    bl.clear();
+  }
+
   void buffer::list::copy(unsigned off, unsigned len, char *dest) const
   {
     if (off + len > length())
@@ -1785,6 +1880,9 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
       // make a new append_buffer!
       append_buffer = raw_combined::create(CEPH_BUFFER_APPEND_SIZE);
       append_buffer.set_length(0);   // unused, so far.
+      if (_mempool >= 0) {
+       append_buffer.get_raw()->reassign_to_mempool(_mempool);
+      }
     }
     append(append_buffer, append_buffer.append(c) - 1, 1);     // add segment to the list
   }
@@ -1812,6 +1910,9 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
        sizeof(raw_combined);
       append_buffer = raw_combined::create(alen);
       append_buffer.set_length(0);   // unused, so far.
+      if (_mempool >= 0) {
+       append_buffer.get_raw()->reassign_to_mempool(_mempool);
+      }
     }
   }
 
@@ -2381,7 +2482,7 @@ __u32 buffer::list::crc32c(__u32 crc) const
          // got it already
          crc = ccrc.second;
          if (buffer_track_crc)
-           buffer_cached_crc.inc();
+           buffer_cached_crc++;
        } else {
          /* If we have cached crc32c(buf, v) for initial value v,
           * we can convert this to a different initial value v' by:
@@ -2393,11 +2494,11 @@ __u32 buffer::list::crc32c(__u32 crc) const
           */
          crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length());
          if (buffer_track_crc)
-           buffer_cached_crc_adjusted.inc();
+           buffer_cached_crc_adjusted++;
        }
       } else {
        if (buffer_track_crc)
-         buffer_missed_crc.inc();
+         buffer_missed_crc++;
        uint32_t base = crc;
        crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length());
        r->set_crc(ofs, make_pair(base, crc));
@@ -2502,8 +2603,27 @@ void buffer::list::hexdump(std::ostream &out, bool trailing_newline) const
   out.flags(original_flags);
 }
 
+
+buffer::list buffer::list::static_from_mem(char* c, size_t l) {
+  list bl;
+  bl.push_back(ptr(create_static(l, c)));
+  return bl;
+}
+
+buffer::list buffer::list::static_from_cstring(char* c) {
+  return static_from_mem(c, std::strlen(c));
+}
+
+buffer::list buffer::list::static_from_string(string& s) {
+  // C++14 just has string::data return a char* from a non-const
+  // string.
+  return static_from_mem(const_cast<char*>(s.data()), s.length());
+  // But the way buffer::list mostly doesn't work in a sane way with
+  // const makes me generally sad.
+}
+
 std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
-  return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.read() << ")";
+  return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
 }
 
 std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
@@ -2546,6 +2666,8 @@ MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned,
 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_pipe, buffer_raw_pipe, buffer_meta);
 #endif
 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
+MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
+                             buffer_meta);
 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_unshareable, buffer_raw_unshareable,
                              buffer_meta);
 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_static, buffer_raw_static,