ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
loff_t off)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
// split off right
void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
if (left->get_journal_tid() == 0) {
void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
// do not merge rx buffers; last_read_tid may not match
*/
bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
while (left > 0) {
if (p == data.end())
*/
bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
if (data.empty())
return true;
map<loff_t, BufferHead*>::iterator first = data.begin();
map<loff_t, BufferHead*>& rx,
map<loff_t, BufferHead*>& errors)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
ldout(oc->cct, 10) << "map_read " << ex.oid << " "
<< ex.offset << "~" << ex.length << dendl;
ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
ceph_tid_t tid)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
BufferHead *final = 0;
ldout(oc->cct, 10) << "map_write oex " << ex.oid
void ObjectCacher::Object::truncate(loff_t s)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
while (!data.empty()) {
void ObjectCacher::Object::discard(loff_t off, loff_t len,
C_GatherBuilder* commit_gather)
{
- ceph_assert(oc->lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
<< dendl;
ObjectCacher::ObjectCacher(CephContext *cct_, string name,
- WritebackHandler& wb, Mutex& l,
+ WritebackHandler& wb, ceph::mutex& l,
flush_set_callback_t flush_callback,
void *flush_callback_arg, uint64_t max_bytes,
uint64_t max_objects, uint64_t max_dirty,
uint64_t truncate_seq)
{
// XXX: Add handling of nspace in object_locator_t in cache
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
// have it?
if ((uint32_t)l.pool < objects.size()) {
if (objects[l.pool].count(oid)) {
void ObjectCacher::close_object(Object *ob)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << "close_object " << *ob << dendl;
ceph_assert(ob->can_close());
void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
const ZTracer::Trace &parent_trace)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
<< reads_outstanding << dendl;
uint64_t length, bufferlist &bl, int r,
bool trust_enoent)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 7) << "bh_read_finish "
<< oid
<< " tid " << tid
retry_waiting_reads();
--reads_outstanding;
- read_cond.Signal();
+ read_cond.notify_all();
}
void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
};
void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
Object *ob = blist.front()->ob;
ob->get();
void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 7) << "bh_write " << *bh << dendl;
bh->ob->get();
vector<pair<loff_t, uint64_t> >& ranges,
ceph_tid_t tid, int r)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
<< " ranges " << ranges << " returned " << r << dendl;
void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
{
ceph_assert(trace != nullptr);
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph::real_time cutoff = ceph::real_clock::now();
ldout(cct, 10) << "flush " << amount << dendl;
void ObjectCacher::trim()
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
<< get_stat_clean() << ", objects: max " << max_objects
<< " current " << ob_lru.lru_get_size() << dendl;
bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
snapid_t snapid)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
++ex_it) {
bool external_call, ZTracer::Trace *trace)
{
ceph_assert(trace != nullptr);
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
bool success = true;
int error = 0;
uint64_t bytes_in_cache = 0;
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
ZTracer::Trace *parent_trace)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph::real_time now = ceph::real_clock::now();
uint64_t bytes_written = 0;
uint64_t bytes_written_in_flush = 0;
void ObjectCacher::C_WaitForWrite::finish(int r)
{
std::lock_guard l(m_oc->lock);
- m_oc->maybe_wait_for_writeback(m_len, &m_trace);
+ m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
m_onfinish->complete(r);
}
-void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
- ZTracer::Trace *trace)
+void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
+ ZTracer::Trace *trace)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph::mono_time start = ceph::mono_clock::now();
int blocked = 0;
// wait for writeback?
<< (get_stat_dirty() + get_stat_tx()) << " >= max "
<< max_dirty << " + dirty_waiting "
<< get_stat_dirty_waiting() << dendl;
- flusher_cond.Signal();
+ flusher_cond.notify_all();
stat_dirty_waiting += len;
++stat_nr_dirty_waiters;
- stat_cond.Wait(lock);
+ std::unique_lock l{lock, std::adopt_lock};
+ stat_cond.wait(l);
+ l.release();
stat_dirty_waiting -= len;
--stat_nr_dirty_waiters;
++blocked;
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
ZTracer::Trace *trace, Context *onfreespace)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(trace != nullptr);
int ret = 0;
if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
if (block_writes_upfront) {
- maybe_wait_for_writeback(len, trace);
+ _maybe_wait_for_writeback(len, trace);
if (onfreespace)
onfreespace->complete(0);
} else {
}
} else {
// write-thru! flush what we just wrote.
- Cond cond;
+ ceph::condition_variable cond;
bool done = false;
Context *fin = block_writes_upfront ?
- new C_Cond(&cond, &done, &ret) : onfreespace;
+ new C_Cond(cond, &done, &ret) : onfreespace;
ceph_assert(fin);
bool flushed = flush_set(oset, wr->extents, trace, fin);
ceph_assert(!flushed); // we just dirtied it, and didn't drop our lock!
ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
<< " bytes" << dendl;
if (block_writes_upfront) {
- while (!done)
- cond.Wait(lock);
+ std::unique_lock l{lock, std::adopt_lock};
+ cond.wait(l, [&done] { return done; });
+ l.release();
ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
if (onfreespace)
onfreespace->complete(ret);
if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
<< target_dirty << ", nudging flusher" << dendl;
- flusher_cond.Signal();
+ flusher_cond.notify_all();
}
return ret;
}
void ObjectCacher::flusher_entry()
{
ldout(cct, 10) << "flusher start" << dendl;
- lock.Lock();
+ std::unique_lock l{lock};
while (!flusher_stop) {
loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
get_stat_dirty();
if (!max) {
// back off the lock to avoid starving other threads
trace.event("backoff");
- lock.Unlock();
- lock.Lock();
+ l.unlock();
+ l.lock();
continue;
}
}
if (flusher_stop)
break;
- flusher_cond.WaitInterval(lock, seconds(1));
+ flusher_cond.wait_for(l, 1s);
}
/* Wait for reads to finish. This is only possible if handling
* the rados reads do come back their callback will try to access the
* no-longer-valid ObjectCacher.
*/
- while (reads_outstanding > 0) {
- ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
- << reads_outstanding << dendl;
- read_cond.Wait(lock);
- }
-
- lock.Unlock();
+ read_cond.wait(l, [this] {
+ if (reads_outstanding > 0) {
+ ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
+ << reads_outstanding << dendl;
+ return false;
+ } else {
+ return true;
+ }
+ });
ldout(cct, 10) << "flusher finish" << dendl;
}
bool ObjectCacher::set_is_empty(ObjectSet *oset)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (oset->objects.empty())
return true;
bool ObjectCacher::set_is_cached(ObjectSet *oset)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (oset->objects.empty())
return false;
bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (oset->objects.empty())
return false;
// purge. non-blocking. violently removes dirty buffers from cache.
void ObjectCacher::purge(Object *ob)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << "purge " << *ob << dendl;
ob->truncate(0);
ZTracer::Trace *trace)
{
ceph_assert(trace != nullptr);
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
list<BufferHead*> blist;
bool clean = true;
ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
Context *onfinish)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (gather->has_subs()) {
gather->set_finisher(onfinish);
gather->activate();
// returns true if already flushed
bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
ZTracer::Trace *trace, Context *onfinish)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(trace != nullptr);
ceph_assert(onfinish != NULL);
if (oset->objects.empty()) {
// returns true if already flushed
bool ObjectCacher::flush_all(Context *onfinish)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(onfinish != NULL);
ldout(cct, 10) << "flush_all " << dendl;
void ObjectCacher::purge_set(ObjectSet *oset)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (oset->objects.empty()) {
ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
return;
loff_t ObjectCacher::release(Object *ob)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
list<BufferHead*> clean;
loff_t o_unclean = 0;
loff_t ObjectCacher::release_set(ObjectSet *oset)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
// return # bytes not clean (and thus not released).
loff_t unclean = 0;
uint64_t ObjectCacher::release_all()
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << "release_all" << dendl;
uint64_t unclean = 0;
void ObjectCacher::clear_nonexistence(ObjectSet *oset)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
for (xlist<Object*>::iterator p = oset->objects.begin();
*/
void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
bool was_dirty = oset->dirty_or_tx > 0;
_discard(oset, exls, nullptr);
const vector<ObjectExtent>& exls,
Context* on_finish)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
bool was_dirty = oset->dirty_or_tx > 0;
C_GatherBuilder gather(cct);
if (gather.has_subs()) {
bool flushed = was_dirty && oset->dirty_or_tx == 0;
- gather.set_finisher(new FunctionContext(
+ gather.set_finisher(new LambdaContext(
[this, oset, flushed, on_finish](int) {
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (flushed && flush_set_callback)
flush_set_callback(flush_set_callback_arg, oset);
if (on_finish)
void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
Context* on_finish)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
// did we truncate off dirty data?
if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
void ObjectCacher::verify_stats() const
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 10) << "verify_stats" << dendl;
loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
void ObjectCacher::bh_stat_add(BufferHead *bh)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
switch (bh->get_state()) {
case BufferHead::STATE_MISSING:
stat_missing += bh->length();
ceph_abort_msg("bh_stat_add: invalid bufferhead state");
}
if (get_stat_dirty_waiting() > 0)
- stat_cond.Signal();
+ stat_cond.notify_all();
}
void ObjectCacher::bh_stat_sub(BufferHead *bh)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
switch (bh->get_state()) {
case BufferHead::STATE_MISSING:
stat_missing -= bh->length();
void ObjectCacher::bh_set_state(BufferHead *bh, int s)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
int state = bh->get_state();
// move between lru lists?
if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
ob->add_bh(bh);
if (bh->is_dirty()) {
void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
{
- ceph_assert(lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(lock));
ceph_assert(bh->get_journal_tid() == 0);
ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
ob->remove_bh(bh);
}
bh_stat_sub(bh);
if (get_stat_dirty_waiting() > 0)
- stat_cond.Signal();
+ stat_cond.notify_all();
}