*
*/
+#include <atomic>
+#include <errno.h>
+#include <limits.h>
+
+#include <sys/uio.h>
+
#include "include/compat.h"
#include "include/mempool.h"
#include "armor.h"
#include "common/likely.h"
#include "common/valgrind.h"
#include "common/deleter.h"
-#include "include/atomic.h"
#include "common/RWLock.h"
#include "include/types.h"
-#include "include/compat.h"
-#include "include/inline_memory.h"
#include "include/scope_guard.h"
+
#if defined(HAVE_XIO)
#include "msg/xio/XioMsg.h"
#endif
-#include <errno.h>
-#include <fstream>
-#include <sstream>
-#include <sys/uio.h>
-#include <limits.h>
-
-#include <atomic>
-#include <ostream>
+using namespace ceph;
#define CEPH_BUFFER_ALLOC_UNIT (MIN(CEPH_PAGE_SIZE, 4096))
#define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))
# define bendl std::endl; }
#endif
- static atomic_t buffer_total_alloc;
- static atomic64_t buffer_history_alloc_bytes;
- static atomic64_t buffer_history_alloc_num;
+ static std::atomic<uint64_t> buffer_total_alloc { 0 };
+ static std::atomic<uint64_t> buffer_history_alloc_bytes { 0 };
+ static std::atomic<uint64_t> buffer_history_alloc_num { 0 };
+
const bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK");
namespace {
void inc_total_alloc(unsigned len) {
if (buffer_track_alloc)
- buffer_total_alloc.add(len);
+ buffer_total_alloc += len;
}
void dec_total_alloc(unsigned len) {
if (buffer_track_alloc)
- buffer_total_alloc.sub(len);
+ buffer_total_alloc -= len;
}
void inc_history_alloc(uint64_t len) {
if (buffer_track_alloc) {
- buffer_history_alloc_bytes.add(len);
- buffer_history_alloc_num.inc();
+ buffer_history_alloc_bytes += len;
+ buffer_history_alloc_num++;
}
}
- }
-
+ } // namespace
int buffer::get_total_alloc() {
- return buffer_total_alloc.read();
+ return buffer_total_alloc;
}
uint64_t buffer::get_history_alloc_bytes() {
- return buffer_history_alloc_bytes.read();
+ return buffer_history_alloc_bytes;
}
uint64_t buffer::get_history_alloc_num() {
- return buffer_history_alloc_num.read();
+ return buffer_history_alloc_num;
}
- static atomic_t buffer_cached_crc;
- static atomic_t buffer_cached_crc_adjusted;
- static atomic_t buffer_missed_crc;
+ static std::atomic<unsigned> buffer_cached_crc { 0 };
+ static std::atomic<unsigned> buffer_cached_crc_adjusted { 0 };
+ static std::atomic<unsigned> buffer_missed_crc { 0 };
+
static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK");
void buffer::track_cached_crc(bool b) {
buffer_track_crc = b;
}
int buffer::get_cached_crc() {
- return buffer_cached_crc.read();
+ return buffer_cached_crc;
}
int buffer::get_cached_crc_adjusted() {
- return buffer_cached_crc_adjusted.read();
+ return buffer_cached_crc_adjusted;
}
int buffer::get_missed_crc() {
- return buffer_missed_crc.read();
+ return buffer_missed_crc;
}
- static atomic_t buffer_c_str_accesses;
+ static std::atomic<unsigned> buffer_c_str_accesses { 0 };
+
static bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK");
void buffer::track_c_str(bool b) {
buffer_track_c_str = b;
}
int buffer::get_c_str_accesses() {
- return buffer_c_str_accesses.read();
+ return buffer_c_str_accesses;
}
- static atomic_t buffer_max_pipe_size;
+ static std::atomic<unsigned> buffer_max_pipe_size { 0 };
int update_max_pipe_size() {
#ifdef CEPH_HAVE_SETPIPE_SZ
char buf[32];
size_t size = strict_strtol(buf, 10, &err);
if (!err.empty())
return -EIO;
- buffer_max_pipe_size.set(size);
+ buffer_max_pipe_size = size;
#endif
return 0;
}
size_t get_max_pipe_size() {
#ifdef CEPH_HAVE_SETPIPE_SZ
- size_t size = buffer_max_pipe_size.read();
+ size_t size = buffer_max_pipe_size;
if (size)
return size;
if (update_max_pipe_size() == 0)
- return buffer_max_pipe_size.read();
+ return buffer_max_pipe_size;
#endif
// this is the max size hardcoded in linux before 2.6.35
return 65536;
public:
char *data;
unsigned len;
- atomic_t nref;
+ std::atomic<unsigned> nref { 0 };
+ int mempool = mempool::mempool_buffer_anon;
mutable std::atomic_flag crc_spinlock = ATOMIC_FLAG_INIT;
map<pair<size_t, size_t>, pair<uint32_t, uint32_t> > crc_map;
explicit raw(unsigned l)
- : data(NULL), len(l), nref(0)
- { }
+ : data(NULL), len(l), nref(0) {
+ mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
+ }
raw(char *c, unsigned l)
- : data(c), len(l), nref(0)
- { }
- virtual ~raw() {}
+ : data(c), len(l), nref(0) {
+ mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
+ }
+ virtual ~raw() {
+ mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
+ -1, -(int)len);
+ }
+
+ void _set_len(unsigned l) {
+ mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
+ -1, -(int)len);
+ len = l;
+ mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
+ }
+
+ void reassign_to_mempool(int pool) {
+ if (pool == mempool) {
+ return;
+ }
+ mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
+ -1, -(int)len);
+ mempool = pool;
+ mempool::get_pool(mempool::pool_index_t(pool)).adjust_count(1, len);
+ }
+
+ void try_assign_to_mempool(int pool) {
+ if (mempool == mempool::mempool_buffer_anon) {
+ reassign_to_mempool(pool);
+ }
+ }
// no copying.
// cppcheck-suppress noExplicitConstructor
}
};
- MEMPOOL_DEFINE_FACTORY(char, char, buffer_data);
-
/*
* raw_combined is always placed within a single allocation along
* with the data buffer. the data goes at the beginning, and
alignof(buffer::raw_combined));
size_t datalen = ROUND_UP_TO(len, alignof(buffer::raw_combined));
- char *ptr = mempool::buffer_data::alloc_char.allocate_aligned(
- rawlen + datalen, align);
+#ifdef DARWIN
+ char *ptr = (char *) valloc(rawlen + datalen);
+#else
+ char *ptr = 0;
+ int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
+ if (r)
+ throw bad_alloc();
+#endif /* DARWIN */
if (!ptr)
throw bad_alloc();
static void operator delete(void *ptr) {
raw_combined *raw = (raw_combined *)ptr;
- size_t rawlen = ROUND_UP_TO(sizeof(buffer::raw_combined),
- alignof(buffer::raw_combined));
- size_t datalen = ROUND_UP_TO(raw->len, alignof(buffer::raw_combined));
- mempool::buffer_data::alloc_char.deallocate_aligned(
- raw->data, rawlen + datalen);
+ ::free((void *)raw->data);
}
};
raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
align = _align;
assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
- data = mempool::buffer_data::alloc_char.allocate_aligned(len, align);
+#ifdef DARWIN
+ data = (char *) valloc(len);
+#else
+ int r = ::posix_memalign((void**)(void*)&data, align, len);
+ if (r)
+ throw bad_alloc();
+#endif /* DARWIN */
if (!data)
throw bad_alloc();
inc_total_alloc(len);
bdout << "raw_posix_aligned " << this << " alloc " << (void *)data << " l=" << l << ", align=" << align << " total_alloc=" << buffer::get_total_alloc() << bendl;
}
~raw_posix_aligned() override {
- mempool::buffer_data::alloc_char.deallocate_aligned(data, len);
+ ::free(data);
dec_total_alloc(len);
bdout << "raw_posix_aligned " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
}
return r;
}
// update length with actual amount read
- len = r;
+ _set_len(r);
return 0;
}
explicit raw_char(unsigned l) : raw(l) {
if (len)
- data = mempool::buffer_data::alloc_char.allocate(len);
+ data = new char[len];
else
data = 0;
inc_total_alloc(len);
bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
}
~raw_char() override {
- if (data)
- mempool::buffer_data::alloc_char.deallocate(data, len);
+ delete[] data;
dec_total_alloc(len);
bdout << "raw_char " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
}
}
};
+ class buffer::raw_claimed_char : public buffer::raw {
+ public:
+ MEMPOOL_CLASS_HELPERS();
+
+ explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
+ inc_total_alloc(len);
+ bdout << "raw_claimed_char " << this << " alloc " << (void *)data
+ << " " << l << " " << buffer::get_total_alloc() << bendl;
+ }
+ ~raw_claimed_char() override {
+ dec_total_alloc(len);
+ bdout << "raw_claimed_char " << this << " free " << (void *)data
+ << " " << buffer::get_total_alloc() << bendl;
+ }
+ raw* clone_empty() override {
+ return new raw_char(len);
+ }
+ };
+
class buffer::raw_unshareable : public buffer::raw {
public:
MEMPOOL_CLASS_HELPERS();
return buffer::create_aligned(len, sizeof(size_t));
}
buffer::raw* buffer::claim_char(unsigned len, char *buf) {
- return new raw_char(len, buf);
+ return new raw_claimed_char(len, buf);
}
buffer::raw* buffer::create_malloc(unsigned len) {
return new raw_malloc(len);
buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw.
{
- r->nref.inc();
+ r->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
{
_raw = create(l);
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l) // ditto.
{
_raw = copy(d, l);
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len)
{
if (_raw) {
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
}
{
assert(o+l <= p._len);
assert(_raw);
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr& buffer::ptr::operator= (const ptr& p)
{
if (p._raw) {
- p._raw->nref.inc();
+ p._raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::raw *raw = p._raw;
if (_raw && !_raw->is_shareable()) {
buffer::raw *tr = _raw;
_raw = tr->clone();
- _raw->nref.set(1);
- if (unlikely(tr->nref.dec() == 0)) {
+ _raw->nref = 1;
+ if (unlikely(--tr->nref == 0)) {
ANNOTATE_HAPPENS_AFTER(&tr->nref);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&tr->nref);
delete tr;
{
if (_raw) {
bdout << "ptr " << this << " release " << _raw << bendl;
- if (_raw->nref.dec() == 0) {
+ if (--_raw->nref == 0) {
//cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
ANNOTATE_HAPPENS_AFTER(&_raw->nref);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
const char *buffer::ptr::c_str() const {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off;
}
char *buffer::ptr::c_str() {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off;
}
const char *buffer::ptr::end_c_str() const {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off + _len;
}
char *buffer::ptr::end_c_str() {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off + _len;
}
const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; }
unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; }
- int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref.read(); }
+ int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref; }
void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
assert(_raw);
maybe_inline_memcpy(dest, src, l, 8);
}
- unsigned buffer::ptr::wasted()
+ unsigned buffer::ptr::wasted() const
{
- assert(_raw);
return _raw->len - _len;
}
{
std::swap(_len, other._len);
std::swap(_memcopy_count, other._memcopy_count);
+ std::swap(_mempool, other._mempool);
_buffers.swap(other._buffers);
append_buffer.swap(other.append_buffer);
//last_p.swap(other.last_p);
return is_aligned(CEPH_PAGE_SIZE);
}
+ void buffer::list::reassign_to_mempool(int pool)
+ {
+ _mempool = pool;
+ if (append_buffer.get_raw()) {
+ append_buffer.get_raw()->reassign_to_mempool(pool);
+ }
+ for (auto& p : _buffers) {
+ p.get_raw()->reassign_to_mempool(pool);
+ }
+ }
+
+ void buffer::list::try_assign_to_mempool(int pool)
+ {
+ _mempool = pool;
+ if (append_buffer.get_raw()) {
+ append_buffer.get_raw()->try_assign_to_mempool(pool);
+ }
+ for (auto& p : _buffers) {
+ p.get_raw()->try_assign_to_mempool(pool);
+ }
+ }
+
void buffer::list::rebuild()
{
if (_len == 0) {
return rebuild_aligned(CEPH_PAGE_SIZE);
}
+ void buffer::list::reserve(size_t prealloc)
+ {
+ if (append_buffer.unused_tail_length() < prealloc) {
+ append_buffer = buffer::create(prealloc);
+ if (_mempool >= 0) {
+ append_buffer.get_raw()->reassign_to_mempool(_mempool);
+ }
+ append_buffer.set_length(0); // unused, so far.
+ }
+ }
+
// sort-of-like-assignment-op
void buffer::list::claim(list& bl, unsigned int flags)
{
bl.last_p = bl.begin();
}
+ void buffer::list::claim_append_piecewise(list& bl)
+ {
+ // steal the other guy's buffers
+ for (std::list<buffer::ptr>::const_iterator i = bl.buffers().begin();
+ i != bl.buffers().end(); i++) {
+ append(*i, 0, i->length());
+ }
+ bl.clear();
+ }
+
void buffer::list::copy(unsigned off, unsigned len, char *dest) const
{
if (off + len > length())
// make a new append_buffer!
append_buffer = raw_combined::create(CEPH_BUFFER_APPEND_SIZE);
append_buffer.set_length(0); // unused, so far.
+ if (_mempool >= 0) {
+ append_buffer.get_raw()->reassign_to_mempool(_mempool);
+ }
}
append(append_buffer, append_buffer.append(c) - 1, 1); // add segment to the list
}
sizeof(raw_combined);
append_buffer = raw_combined::create(alen);
append_buffer.set_length(0); // unused, so far.
+ if (_mempool >= 0) {
+ append_buffer.get_raw()->reassign_to_mempool(_mempool);
+ }
}
}
// got it already
crc = ccrc.second;
if (buffer_track_crc)
- buffer_cached_crc.inc();
+ buffer_cached_crc++;
} else {
/* If we have cached crc32c(buf, v) for initial value v,
* we can convert this to a different initial value v' by:
*/
crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length());
if (buffer_track_crc)
- buffer_cached_crc_adjusted.inc();
+ buffer_cached_crc_adjusted++;
}
} else {
if (buffer_track_crc)
- buffer_missed_crc.inc();
+ buffer_missed_crc++;
uint32_t base = crc;
crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length());
r->set_crc(ofs, make_pair(base, crc));
out.flags(original_flags);
}
+
+buffer::list buffer::list::static_from_mem(char* c, size_t l) {
+ list bl;
+ bl.push_back(ptr(create_static(l, c)));
+ return bl;
+}
+
+buffer::list buffer::list::static_from_cstring(char* c) {
+ return static_from_mem(c, std::strlen(c));
+}
+
+buffer::list buffer::list::static_from_string(string& s) {
+ // C++14 just has string::data return a char* from a non-const
+ // string.
+ return static_from_mem(const_cast<char*>(s.data()), s.length());
+ // But the way buffer::list mostly doesn't work in a sane way with
+ // const makes me generally sad.
+}
+
std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
- return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.read() << ")";
+ return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
}
std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_pipe, buffer_raw_pipe, buffer_meta);
#endif
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
+MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
+ buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_unshareable, buffer_raw_unshareable,
buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_static, buffer_raw_static,