]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Journaler.cc
update sources to v12.1.0
[ceph.git] / ceph / src / osdc / Journaler.cc
index 5949ff3b4016cd4282279996f15719cd7281bdae..fe9a6e9cdaf2b043942406e543ea9716fb3cf7fb 100644 (file)
@@ -526,11 +526,16 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
                 << dendl;
 
   // kick waiters <= safe_pos
-  while (!waitfor_safe.empty()) {
-    if (waitfor_safe.begin()->first > safe_pos)
-      break;
-    finish_contexts(cct, waitfor_safe.begin()->second);
-    waitfor_safe.erase(waitfor_safe.begin());
+  if (!waitfor_safe.empty()) {
+    list<Context*> ls;
+    while (!waitfor_safe.empty()) {
+      auto it = waitfor_safe.begin();
+      if (it->first > safe_pos)
+       break;
+      ls.splice(ls.end(), it->second);
+      waitfor_safe.erase(it);
+    }
+    finish_contexts(cct, ls);
   }
 }
 
@@ -568,8 +573,12 @@ uint64_t Journaler::append_entry(bufferlist& bl)
     ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
                   << write_obj << " flo " << flush_obj << ")" << dendl;
     _do_flush(write_buf.length() - write_off);
-    if (write_off) {
-      // current entry isn't being flushed, set next_safe_pos to the end of previous entry
+
+    // if _do_flush() skips flushing some data, it does do a best effort to
+    // update next_safe_pos.
+    if (write_buf.length() > 0 &&
+       write_buf.length() <= wrote) { // the unflushed data are within this entry
+      // set next_safe_pos to end of previous entry
       next_safe_pos = write_pos - wrote;
     }
   }
@@ -634,6 +643,17 @@ void Journaler::_do_flush(unsigned amount)
     next_safe_pos = write_pos;
   } else {
     write_buf.splice(0, len, &write_bl);
+    // Keys of waitfor_safe map are journal entry boundaries.
+    // Try finding a journal entry that we are actually flushing
+    // and set next_safe_pos to end of it. This is best effort.
+    // The one we found may not be the lastest flushing entry.
+    auto p = waitfor_safe.lower_bound(flush_pos + len);
+    if (p != waitfor_safe.end()) {
+      if (p->first > flush_pos + len && p != waitfor_safe.begin())
+       --p;
+      if (p->first <= flush_pos + len && p->first > next_safe_pos)
+       next_safe_pos = p->first;
+    }
   }
 
   filer.write(ino, &layout, snapc,
@@ -952,7 +972,16 @@ void Journaler::_issue_read(uint64_t len)
     if (pending_safe.empty()) {
       _flush(NULL);
     }
-    waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
+
+    // Make sure keys of waitfor_safe map are journal entry boundaries.
+    // The key we used here is either next_safe_pos or old value of
+    // next_safe_pos. next_safe_pos is always set to journal entry
+    // boundary.
+    auto p = pending_safe.rbegin();
+    if (p != pending_safe.rend())
+      waitfor_safe[p->second].push_back(new C_RetryRead(this));
+    else
+      waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
     return;
   }
 
@@ -1062,6 +1091,7 @@ bool Journaler::_is_readable()
     // adjust write_pos
     prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
     assert(write_buf.length() == 0);
+    assert(waitfor_safe.empty());
 
     // reset read state
     requested_pos = received_pos = read_pos;
@@ -1196,7 +1226,7 @@ void Journaler::wait_for_readable(Context *onreadable)
 {
   lock_guard l(lock);
   if (stopping) {
-    onreadable->complete(-EAGAIN);
+    finisher->queue(onreadable, -EAGAIN);
     return;
   }