#include "include/assert.h"
#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
+#define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
using std::chrono::seconds;
/// while holding the lock
xlist<C_ReadFinish*>::item set_item;
bool trust_enoent;
ceph_tid_t tid;
+ ZTracer::Trace trace;
public:
bufferlist bl;
C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
- uint64_t l) :
+ uint64_t l, const ZTracer::Trace &trace) :
oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
set_item(this), trust_enoent(true),
- tid(t) {
+ tid(t), trace(trace) {
ob->reads.push_back(&set_item);
}
void finish(int r) override {
oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
+ trace.event("finish");
// object destructor clears the list
if (set_item.is_on_list())
OSDRead *rd;
ObjectSet *oset;
Context *onfinish;
+ ZTracer::Trace trace;
public:
- C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c)
- : oc(_oc), rd(r), oset(os), onfinish(c) {}
+ C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
+ const ZTracer::Trace &trace)
+ : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
+ }
void finish(int r) override {
- if (r < 0) {
- if (onfinish)
- onfinish->complete(r);
+ if (r >= 0) {
+ r = oc->_readx(rd, oset, onfinish, false, &trace);
+ }
+
+ if (r == 0) {
+ // read is still in-progress
return;
}
- int ret = oc->_readx(rd, oset, onfinish, false);
- if (ret != 0 && onfinish) {
- onfinish->complete(ret);
+
+ trace.event("finish");
+ if (onfinish) {
+ onfinish->complete(r);
}
}
};
void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
{
assert(oc->lock.is_locked());
- assert(left->end() == right->start());
- assert(left->get_state() == right->get_state());
- assert(left->can_merge_journal(right));
ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
if (left->get_journal_tid() == 0) {
ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
}
+bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
+{
+ if (left->end() != right->start() ||
+ left->get_state() != right->get_state() ||
+ !left->can_merge_journal(right))
+ return false;
+ if (left->is_tx() && left->last_write_tid != right->last_write_tid)
+ return false;
+ return true;
+}
+
void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
{
assert(oc->lock.is_locked());
assert(p->second == bh);
if (p != data.begin()) {
--p;
- if (p->second->end() == bh->start() &&
- p->second->get_state() == bh->get_state() &&
- p->second->can_merge_journal(bh)) {
+ if (can_merge_bh(p->second, bh)) {
merge_left(p->second, bh);
bh = p->second;
} else {
// to the right?
assert(p->second == bh);
++p;
- if (p != data.end() &&
- p->second->start() == bh->end() &&
- p->second->get_state() == bh->get_state() &&
- p->second->can_merge_journal(bh))
+ if (p != data.end() && can_merge_bh(bh, p->second))
merge_left(bh, p->second);
}
map<loff_t, BufferHead*>& errors)
{
assert(oc->lock.is_locked());
- ldout(oc->cct, 10) << "map_read " << ex.oid
- << " " << ex.offset << "~" << ex.length
- << dendl;
-
+ ldout(oc->cct, 10) << "map_read " << ex.oid << " "
+ << ex.offset << "~" << ex.length << dendl;
+
loff_t cur = ex.offset;
loff_t left = ex.length;
assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
break; // no more.
}
-
+
if (p->first <= cur) {
// have it (or part of it)
BufferHead *e = p->second;
} else {
ceph_abort();
}
-
+
loff_t lenfromcur = MIN(e->end() - cur, left);
cur += lenfromcur;
left -= lenfromcur;
++p;
continue; // more?
-
+
} else if (p->first > cur) {
// gap.. miss
loff_t next = p->first;
* other dirty data to left and/or right.
*/
ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
- ceph_tid_t tid)
+ ceph_tid_t tid)
{
assert(oc->lock.is_locked());
BufferHead *final = 0;
max_size(max_bytes), max_objects(max_objects),
max_dirty_age(ceph::make_timespan(max_dirty_age)),
block_writes_upfront(block_writes_upfront),
+ trace_endpoint("ObjectCacher"),
flush_set_callback(flush_callback),
flush_set_callback_arg(flush_callback_arg),
last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
- stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
+ stat_missing(0), stat_error(0), stat_dirty_waiting(0),
+ stat_nr_dirty_waiters(0), reads_outstanding(0)
{
perf_start();
finisher.start();
delete ob;
}
-void ObjectCacher::bh_read(BufferHead *bh, int op_flags)
+void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
+ const ZTracer::Trace &parent_trace)
{
assert(lock.is_locked());
ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
<< reads_outstanding << dendl;
+ ZTracer::Trace trace;
+ if (parent_trace.valid()) {
+ trace.init("", &trace_endpoint, &parent_trace);
+ trace.copy_name("bh_read " + bh->ob->get_oid().name);
+ trace.event("start");
+ }
+
mark_rx(bh);
bh->last_read_tid = ++last_read_tid;
// finisher
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
- bh->start(), bh->length());
+ bh->start(), bh->length(), trace);
// go
writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
bh->ob->get_oloc(), bh->start(), bh->length(),
bh->ob->get_snap(), &onfinish->bl,
bh->ob->truncate_size, bh->ob->truncate_seq,
- op_flags, onfinish);
+ op_flags, trace, onfinish);
++reads_outstanding;
}
int64_t poolid;
sobject_t oid;
vector<pair<loff_t, uint64_t> > ranges;
+ ZTracer::Trace trace;
public:
- ceph_tid_t tid;
+ ceph_tid_t tid = 0;
C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
- uint64_t l) :
- oc(c), poolid(_poolid), oid(o), tid(0) {
+ uint64_t l, const ZTracer::Trace &trace) :
+ oc(c), poolid(_poolid), oid(o), trace(trace) {
ranges.push_back(make_pair(s, l));
}
C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
}
void finish(int r) override {
oc->bh_write_commit(poolid, oid, ranges, tid, r);
+ trace.event("finish");
}
};
void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
perfcounter->inc(l_objectcacher_data_flushed, total_len);
}
-void ObjectCacher::bh_write(BufferHead *bh)
+void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
{
assert(lock.is_locked());
ldout(cct, 7) << "bh_write " << *bh << dendl;
bh->ob->get();
+ ZTracer::Trace trace;
+ if (parent_trace.valid()) {
+ trace.init("", &trace_endpoint, &parent_trace);
+ trace.copy_name("bh_write " + bh->ob->get_oid().name);
+ trace.event("start");
+ }
+
// finishers
C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
bh->ob->get_soid(), bh->start(),
- bh->length());
+ bh->length(), trace);
// go
ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
bh->ob->get_oloc(),
bh->snapc, bh->bl, bh->last_write,
bh->ob->truncate_size,
bh->ob->truncate_seq,
- bh->journal_tid, oncommit);
+ bh->journal_tid, trace, oncommit);
ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
// set bh last_write_tid
++p) {
BufferHead *bh = p->second;
- if (bh->start() > start+(loff_t)length)
+ if (bh->start() >= start+(loff_t)length)
break;
- if (bh->start() < start &&
- bh->end() > start+(loff_t)length) {
- ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
- continue;
- }
-
// make sure bh is tx
if (!bh->is_tx()) {
ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
continue;
}
+ // we don't merge tx buffers. tx buffer should be within the range
+ assert(bh->start() >= start);
+ assert(bh->end() <= start+(loff_t)length);
+
if (r >= 0) {
// ok! mark bh clean and error-free
mark_clean(bh);
finish_contexts(cct, ls, r);
}
-void ObjectCacher::flush(loff_t amount)
+void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
{
+ assert(trace != nullptr);
assert(lock.is_locked());
ceph::real_time cutoff = ceph::real_clock::now();
bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
} else {
left -= bh->length();
- bh_write(bh);
+ bh_write(bh, *trace);
}
- }
+ }
}
<< get_stat_clean() << ", objects: max " << max_objects
<< " current " << ob_lru.lru_get_size() << dendl;
- while (get_stat_clean() > 0 && (uint64_t) get_stat_clean() > max_size) {
+ uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
+ uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
+ while (get_stat_clean() > 0 &&
+ ((uint64_t)get_stat_clean() > max_size ||
+ nr_clean_bh > max_clean_bh)) {
BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
if (!bh)
break;
bh_remove(ob, bh);
delete bh;
+ --nr_clean_bh;
+
if (ob->complete) {
ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
ob->complete = false;
* must delete it)
* returns 0 if doing async read
*/
-int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+ ZTracer::Trace *parent_trace)
{
- return _readx(rd, oset, onfinish, true);
+ ZTracer::Trace trace;
+ if (parent_trace != nullptr) {
+ trace.init("read", &trace_endpoint, parent_trace);
+ trace.event("start");
+ }
+
+ int r =_readx(rd, oset, onfinish, true, &trace);
+ if (r < 0) {
+ trace.event("finish");
+ }
+ return r;
}
int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- bool external_call)
+ bool external_call, ZTracer::Trace *trace)
{
+ assert(trace != nullptr);
assert(lock.is_locked());
bool success = true;
int error = 0;
if (scattered_write)
blist.push_back(bh);
else
- bh_write(bh);
+ bh_write(bh, *trace);
}
}
}
ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
<< " on " << *o << dendl;
o->waitfor_commit[o->last_write_tid].push_back(
- new C_RetryRead(this,rd, oset, onfinish));
+ new C_RetryRead(this,rd, oset, onfinish, *trace));
// FIXME: perfcounter!
return 0;
}
<< waitfor_read.size() << " blocked reads, "
<< (MAX(rx_bytes, max_size) - max_size)
<< " read bytes" << dendl;
- waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+ waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
+ *trace));
}
bh_remove(o, bh_it->second);
delete bh_it->second;
} else {
bh_it->second->set_nocache(nocache);
- bh_read(bh_it->second, rd->fadvise_flags);
+ bh_read(bh_it->second, rd->fadvise_flags, *trace);
if ((success && onfinish) || last != missing.end())
last = bh_it;
}
ldout(cct, 10) << "readx missed, waiting on " << *last->second
<< " off " << last->first << dendl;
last->second->waitfor_read[last->first].push_back(
- new C_RetryRead(this, rd, oset, onfinish) );
+ new C_RetryRead(this, rd, oset, onfinish, *trace) );
}
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back(
- new C_RetryRead(this, rd, oset, onfinish) );
+ new C_RetryRead(this, rd, oset, onfinish, *trace) );
}
bytes_not_in_cache += bh_it->second->length();
success = false;
waitfor_read.splice(waitfor_read.end(), ls);
}
-int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+ ZTracer::Trace *parent_trace)
{
assert(lock.is_locked());
ceph::real_time now = ceph::real_clock::now();
bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+ ZTracer::Trace trace;
+ if (parent_trace != nullptr) {
+ trace.init("write", &trace_endpoint, parent_trace);
+ trace.event("start");
+ }
+
for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
++ex_it) {
BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
bool missing = bh->is_missing();
bh->snapc = wr->snapc;
-
+
bytes_written += ex_it->length;
if (bh->is_tx()) {
bytes_written_in_flush += ex_it->length;
}
}
- int r = _wait_for_write(wr, bytes_written, oset, onfreespace);
+ int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
delete wr;
//verify_stats();
class ObjectCacher::C_WaitForWrite : public Context {
public:
- C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
- m_oc(oc), m_len(len), m_onfinish(onfinish) {}
+ C_WaitForWrite(ObjectCacher *oc, uint64_t len,
+ const ZTracer::Trace &trace, Context *onfinish) :
+ m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
void finish(int r) override;
private:
ObjectCacher *m_oc;
uint64_t m_len;
+ ZTracer::Trace m_trace;
Context *m_onfinish;
};
void ObjectCacher::C_WaitForWrite::finish(int r)
{
Mutex::Locker l(m_oc->lock);
- m_oc->maybe_wait_for_writeback(m_len);
+ m_oc->maybe_wait_for_writeback(m_len, &m_trace);
m_onfinish->complete(r);
}
-void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
+void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
+ ZTracer::Trace *trace)
{
assert(lock.is_locked());
ceph::mono_time start = ceph::mono_clock::now();
// - do not wait for bytes other waiters are waiting on. this means that
// threads do not wait for each other. this effectively allows the cache
// size to balloon proportional to the data that is in flight.
+
+ uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
while (get_stat_dirty() + get_stat_tx() > 0 &&
- (uint64_t) (get_stat_dirty() + get_stat_tx()) >=
- max_dirty + get_stat_dirty_waiting()) {
+ (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
+ max_dirty + get_stat_dirty_waiting()) ||
+ (dirty_or_tx_bh.size() >=
+ max_dirty_bh + get_stat_nr_dirty_waiters()))) {
+
+ if (blocked == 0) {
+ trace->event("start wait for writeback");
+ }
ldout(cct, 10) << __func__ << " waiting for dirty|tx "
<< (get_stat_dirty() + get_stat_tx()) << " >= max "
<< max_dirty << " + dirty_waiting "
<< get_stat_dirty_waiting() << dendl;
flusher_cond.Signal();
stat_dirty_waiting += len;
+ ++stat_nr_dirty_waiters;
stat_cond.Wait(lock);
stat_dirty_waiting -= len;
+ --stat_nr_dirty_waiters;
++blocked;
ldout(cct, 10) << __func__ << " woke up" << dendl;
}
+ if (blocked > 0) {
+ trace->event("finish wait for writeback");
+ }
if (blocked && perfcounter) {
perfcounter->inc(l_objectcacher_write_ops_blocked);
perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
// blocking wait for write.
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
- Context *onfreespace)
+ ZTracer::Trace *trace, Context *onfreespace)
{
assert(lock.is_locked());
+ assert(trace != nullptr);
int ret = 0;
if (max_dirty > 0) {
if (block_writes_upfront) {
- maybe_wait_for_writeback(len);
+ maybe_wait_for_writeback(len, trace);
if (onfreespace)
onfreespace->complete(0);
} else {
assert(onfreespace);
- finisher.queue(new C_WaitForWrite(this, len, onfreespace));
+ finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
}
} else {
// write-thru! flush what we just wrote.
Context *fin = block_writes_upfront ?
new C_Cond(&cond, &done, &ret) : onfreespace;
assert(fin);
- bool flushed = flush_set(oset, wr->extents, fin);
+ bool flushed = flush_set(oset, wr->extents, trace, fin);
assert(!flushed); // we just dirtied it, and didn't drop our lock!
ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
<< " bytes" << dendl;
<< max_dirty << " max)"
<< dendl;
loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
+
+ ZTracer::Trace trace;
+ if (cct->_conf->osdc_blkin_trace_all) {
+ trace.init("flusher", &trace_endpoint);
+ trace.event("start");
+ }
+
if (actual > 0 && (uint64_t) actual > target_dirty) {
// flush some dirty pages
ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
<< get_stat_dirty_waiting() << " dirty_waiting > target "
<< target_dirty << ", flushing some dirty bhs" << dendl;
- flush(actual - target_dirty);
+ flush(&trace, actual - target_dirty);
} else {
// check tail of lru for old dirty items
ceph::real_time cutoff = ceph::real_clock::now();
if (scattered_write) {
bh_write_adjacencies(bh, cutoff, NULL, &max);
} else {
- bh_write(bh);
+ bh_write(bh, trace);
--max;
}
}
if (!max) {
// back off the lock to avoid starving other threads
+ trace.event("backoff");
lock.Unlock();
lock.Lock();
continue;
}
}
+
+ trace.event("finish");
if (flusher_stop)
break;
// true if clean, already flushed.
// false if we wrote something.
// be sloppy about the ranges and flush any buffer it touches
-bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
+ ZTracer::Trace *trace)
{
+ assert(trace != nullptr);
assert(lock.is_locked());
list<BufferHead*> blist;
bool clean = true;
if (scattered_write)
blist.push_back(bh);
else
- bh_write(bh);
+ bh_write(bh, *trace);
clean = false;
}
if (scattered_write && !blist.empty())
}
blist.push_back(bh);
} else {
- bh_write(bh);
+ bh_write(bh, {});
}
}
}
}
blist.push_front(bh);
} else {
- bh_write(bh);
+ bh_write(bh, {});
}
}
if (!backwards)
// flush. non-blocking, takes callback.
// returns true if already flushed
bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
- Context *onfinish)
+ ZTracer::Trace *trace, Context *onfinish)
{
assert(lock.is_locked());
+ assert(trace != nullptr);
assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
<< " " << ob << dendl;
- if (!flush(ob, ex.offset, ex.length)) {
+ if (!flush(ob, ex.offset, ex.length, trace)) {
// we'll need to gather...
ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
<< ob->last_write_tid << " on " << *ob << dendl;
}
blist.push_back(bh);
} else {
- bh_write(bh);
+ bh_write(bh, {});
}
}