]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Journaler.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osdc / Journaler.cc
index fe9a6e9cdaf2b043942406e543ea9716fb3cf7fb..8ca0c4b27188eddf8db2856db60ebb1fcb3bade4 100644 (file)
@@ -84,7 +84,11 @@ void Journaler::_set_layout(file_layout_t const *l)
 {
   layout = *l;
 
-  assert(layout.pool_id == pg_pool);
+  if (layout.pool_id != pg_pool) {
+    // user can reset pool id through cephfs-journal-tool
+    lderr(cct) << "may got older pool id from header layout" << dendl;
+    ceph_abort();
+  }
   last_written.layout = layout;
   last_committed.layout = layout;
 
@@ -155,7 +159,7 @@ public:
 void Journaler::recover(Context *onread) 
 {
   lock_guard l(lock);
-  if (stopping) {
+  if (is_stopping()) {
     onread->complete(-EAGAIN);
     return;
   }
@@ -215,6 +219,10 @@ void Journaler::_reread_head(Context *onfinish)
 void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    finish->complete(-EAGAIN);
+    return;
+  }
 
   //read on-disk header into
   assert(bl.length() || r < 0 );
@@ -243,6 +251,8 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 void Journaler::_finish_read_head(int r, bufferlist& bl)
 {
   lock_guard l(lock);
+  if (is_stopping())
+    return;
 
   assert(state == STATE_READHEAD);
 
@@ -333,6 +343,10 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
                                C_OnFinisher *onfinish)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    onfinish->complete(-EAGAIN);
+    return;
+  }
 
   assert(new_end >= write_pos || r < 0);
   ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
@@ -346,6 +360,8 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
 void Journaler::_finish_probe_end(int r, uint64_t end)
 {
   lock_guard l(lock);
+  if (is_stopping())
+    return;
 
   assert(state == STATE_PROBING);
   if (r < 0) { // error in probing
@@ -398,6 +414,10 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
 {
   // Expect to be called back from finish_reread_head, which already takes lock
   // lock is locked
+  if (is_stopping()) {
+    onfinish->complete(-EAGAIN);
+    return;
+  }
 
   assert(!r); //if we get an error, we're boned
   _reprobe(onfinish);
@@ -589,6 +609,8 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 
 void Journaler::_do_flush(unsigned amount)
 {
+  if (is_stopping())
+    return;
   if (write_pos == flush_pos)
     return;
   assert(write_pos > flush_pos);
@@ -611,19 +633,16 @@ void Journaler::_do_flush(unsigned amount)
       ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
                     << " already too close to prezero_pos " << prezero_pos
                     << ", zeroing first" << dendl;
-      waiting_for_zero = true;
+      waiting_for_zero_pos = flush_pos + len;
       return;
     }
     if (static_cast<uint64_t>(newlen) < len) {
       ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
                     << " but hit prezero_pos " << prezero_pos
                     << ", will do " << flush_pos << "~" << newlen << dendl;
+      waiting_for_zero_pos = flush_pos + len;
       len = newlen;
-    } else {
-      waiting_for_zero = false;
     }
-  } else {
-    waiting_for_zero = false;
   }
   ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
 
@@ -678,7 +697,7 @@ void Journaler::_do_flush(unsigned amount)
 void Journaler::wait_for_flush(Context *onsafe)
 {
   lock_guard l(lock);
-  if (stopping) {
+  if (is_stopping()) {
     onsafe->complete(-EAGAIN);
     return;
   }
@@ -711,6 +730,10 @@ void Journaler::_wait_for_flush(Context *onsafe)
 void Journaler::flush(Context *onsafe)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    onsafe->complete(-EAGAIN);
+    return;
+  }
   _flush(wrap_finisher(onsafe));
 }
 
@@ -827,8 +850,15 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
       pending_zero.erase(b);
     }
 
-    if (waiting_for_zero) {
-      _do_flush();
+    if (waiting_for_zero_pos > flush_pos) {
+      _do_flush(waiting_for_zero_pos - flush_pos);
+    }
+
+    if (prezero_pos == prezeroing_pos &&
+       !waitfor_prezero.empty()) {
+      list<Context*> ls;
+      ls.swap(waitfor_prezero);
+      finish_contexts(cct, ls, 0);
     }
   } else {
     pending_zero.insert(start, len);
@@ -839,6 +869,17 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
                 << dendl;
 }
 
+void Journaler::wait_for_prezero(Context *onfinish)
+{
+  assert(onfinish);
+  lock_guard l(lock);
+
+  if (prezero_pos == prezeroing_pos) {
+    finisher->queue(onfinish, 0);
+    return;
+  }
+  waitfor_prezero.push_back(wrap_finisher(onfinish));
+}
 
 
 /***************** READING *******************/
@@ -1018,6 +1059,9 @@ void Journaler::_issue_read(uint64_t len)
 
 void Journaler::_prefetch()
 {
+  if (is_stopping())
+    return;
+
   ldout(cct, 10) << "_prefetch" << dendl;
   // prefetch
   uint64_t pf;
@@ -1163,6 +1207,10 @@ void Journaler::erase(Context *completion)
 void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
 {
   lock_guard l(lock);
+  if (is_stopping()) {
+    completion->complete(-EAGAIN);
+    return;
+  }
 
   if (data_result == 0) {
     // Async delete the journal header
@@ -1225,7 +1273,7 @@ bool Journaler::try_read_entry(bufferlist& bl)
 void Journaler::wait_for_readable(Context *onreadable)
 {
   lock_guard l(lock);
-  if (stopping) {
+  if (is_stopping()) {
     finisher->queue(onreadable, -EAGAIN);
     return;
   }
@@ -1270,6 +1318,9 @@ void Journaler::trim()
 
 void Journaler::_trim()
 {
+  if (is_stopping())
+    return;
+
   assert(!readonly);
   uint64_t period = get_layout_period();
   uint64_t trim_to = last_committed.expire_pos;
@@ -1520,8 +1571,8 @@ void Journaler::shutdown()
 
   ldout(cct, 1) << __func__ << dendl;
 
+  state = STATE_STOPPING;
   readable = false;
-  stopping = true;
 
   // Kick out anyone reading from journal
   error = -EAGAIN;
@@ -1531,7 +1582,9 @@ void Journaler::shutdown()
     f->complete(-EAGAIN);
   }
 
-  finish_contexts(cct, waitfor_recover, -ESHUTDOWN);
+  list<Context*> ls;
+  ls.swap(waitfor_recover);
+  finish_contexts(cct, ls, -ESHUTDOWN);
 
   std::map<uint64_t, std::list<Context*> >::iterator i;
   for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {