<< dendl;
// kick waiters <= safe_pos
- while (!waitfor_safe.empty()) {
- if (waitfor_safe.begin()->first > safe_pos)
- break;
- finish_contexts(cct, waitfor_safe.begin()->second);
- waitfor_safe.erase(waitfor_safe.begin());
+ if (!waitfor_safe.empty()) {
+ list<Context*> ls;
+ while (!waitfor_safe.empty()) {
+ auto it = waitfor_safe.begin();
+ if (it->first > safe_pos)
+ break;
+ ls.splice(ls.end(), it->second);
+ waitfor_safe.erase(it);
+ }
+ finish_contexts(cct, ls);
}
}
ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
<< write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
- if (write_off) {
- // current entry isn't being flushed, set next_safe_pos to the end of previous entry
+
+ // if _do_flush() skips flushing some data, it does do a best effort to
+ // update next_safe_pos.
+ if (write_buf.length() > 0 &&
+ write_buf.length() <= wrote) { // the unflushed data are within this entry
+ // set next_safe_pos to end of previous entry
next_safe_pos = write_pos - wrote;
}
}
next_safe_pos = write_pos;
} else {
write_buf.splice(0, len, &write_bl);
+ // Keys of waitfor_safe map are journal entry boundaries.
+ // Try finding a journal entry that we are actually flushing
+ // and set next_safe_pos to end of it. This is best effort.
+ // The one we found may not be the lastest flushing entry.
+ auto p = waitfor_safe.lower_bound(flush_pos + len);
+ if (p != waitfor_safe.end()) {
+ if (p->first > flush_pos + len && p != waitfor_safe.begin())
+ --p;
+ if (p->first <= flush_pos + len && p->first > next_safe_pos)
+ next_safe_pos = p->first;
+ }
}
filer.write(ino, &layout, snapc,
if (pending_safe.empty()) {
_flush(NULL);
}
- waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
+
+ // Make sure keys of waitfor_safe map are journal entry boundaries.
+ // The key we used here is either next_safe_pos or old value of
+ // next_safe_pos. next_safe_pos is always set to journal entry
+ // boundary.
+ auto p = pending_safe.rbegin();
+ if (p != pending_safe.rend())
+ waitfor_safe[p->second].push_back(new C_RetryRead(this));
+ else
+ waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
return;
}
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
assert(write_buf.length() == 0);
+ assert(waitfor_safe.empty());
// reset read state
requested_pos = received_pos = read_pos;
{
lock_guard l(lock);
if (stopping) {
- onreadable->complete(-EAGAIN);
+ finisher->queue(onreadable, -EAGAIN);
return;
}