]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/PurgeQueue.cc
update source to 12.2.11
[ceph.git] / ceph / src / mds / PurgeQueue.cc
index fa5d43f6229f584c25e935b067c05aac15dd57a8..329c60e743e31a2e3cb79fa423e5513a98597d7e 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "PurgeQueue.h"
 
+#include <string.h>
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_mds
@@ -93,6 +94,7 @@ PurgeQueue::~PurgeQueue()
   if (logger) {
     g_ceph_context->get_perfcounters_collection()->remove(logger.get());
   }
+  delete on_error;
 }
 
 void PurgeQueue::create_logger()
@@ -123,6 +125,12 @@ void PurgeQueue::init()
 void PurgeQueue::activate()
 {
   Mutex::Locker l(lock);
+
+  if (readonly) {
+    dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
+    return;
+  }
+
   if (journaler.get_read_pos() == journaler.get_write_pos())
     return;
 
@@ -177,7 +185,7 @@ void PurgeQueue::open(Context *completion)
       finish_contexts(g_ceph_context, waiting_for_recovery);
     } else {
       derr << "Error " << r << " loading Journaler" << dendl;
-      on_error->complete(r);
+      _go_readonly(r);
     }
   }));
 }
@@ -185,10 +193,14 @@ void PurgeQueue::open(Context *completion)
 void PurgeQueue::wait_for_recovery(Context* c)
 {
   Mutex::Locker l(lock);
-  if (recovered)
+  if (recovered) {
     c->complete(0);
-  else
+  } else if (readonly) {
+    dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
+    c->complete(-EROFS);
+  } else {
     waiting_for_recovery.push_back(c);
+  }
 }
 
 void PurgeQueue::_recover()
@@ -210,7 +222,7 @@ void PurgeQueue::_recover()
     if (journaler.get_error()) {
       int r = journaler.get_error();
       derr << "Error " << r << " recovering write_pos" << dendl;
-      on_error->complete(r);
+      _go_readonly(r);
       return;
     }
 
@@ -244,8 +256,12 @@ void PurgeQueue::create(Context *fin)
   journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
   journaler.write_head(new FunctionContext([this](int r) {
     Mutex::Locker l(lock);
-    recovered = true;
-    finish_contexts(g_ceph_context, waiting_for_recovery);
+    if (r) {
+      _go_readonly(r);
+    } else {
+      recovered = true;
+      finish_contexts(g_ceph_context, waiting_for_recovery);
+    }
   }));
 }
 
@@ -257,6 +273,12 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion)
   dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
   Mutex::Locker l(lock);
 
+  if (readonly) {
+    dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
+    completion->complete(-EROFS);
+    return;
+  }
+
   // Callers should have waited for open() before using us
   assert(!journaler.is_readonly());
 
@@ -271,7 +293,7 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion)
   if (!could_consume) {
     // Usually, it is not necessary to explicitly flush here, because the reader
     // will get flushes generated inside Journaler::is_readable.  However,
-    // if we remain in a can_consume()==false state for a long period then
+    // if we remain in a _can_consume()==false state for a long period then
     // we should flush in order to allow MDCache to drop its strays rather
     // than having them wait for purgequeue to progress.
     if (!delayed_flush) {
@@ -317,8 +339,13 @@ uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
   return ops_required;
 }
 
-bool PurgeQueue::can_consume()
+bool PurgeQueue::_can_consume()
 {
+  if (readonly) {
+    dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
+    return false;
+  }
+
   dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
            << in_flight.size() << "/" << g_conf->mds_max_purge_files
            << " files" << dendl;
@@ -346,12 +373,23 @@ bool PurgeQueue::can_consume()
   }
 }
 
+void PurgeQueue::_go_readonly(int r)
+{
+  if (readonly) return;
+  dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
+  readonly = true;
+  on_error->complete(r);
+  on_error = nullptr;
+  journaler.set_readonly();
+  finish_contexts(g_ceph_context, waiting_for_recovery, r);
+}
+
 bool PurgeQueue::_consume()
 {
   assert(lock.is_locked_by_me());
 
   bool could_consume = false;
-  while(can_consume()) {
+  while(_can_consume()) {
 
     if (delayed_flush) {
       // We are now going to read from the journal, so any proactive
@@ -363,7 +401,7 @@ bool PurgeQueue::_consume()
 
     if (int r = journaler.get_error()) {
       derr << "Error " << r << " recovering write_pos" << dendl;
-      on_error->complete(r);
+      _go_readonly(r);
       return could_consume;
     }
 
@@ -377,7 +415,7 @@ bool PurgeQueue::_consume()
           if (r == 0) {
             _consume();
           } else if (r != -EAGAIN) {
-            on_error->complete(r);
+            _go_readonly(r);
           }
         }));
       }
@@ -399,7 +437,7 @@ bool PurgeQueue::_consume()
     } catch (const buffer::error &err) {
       derr << "Decode error at read_pos=0x" << std::hex
            << journaler.get_read_pos() << dendl;
-      on_error->complete(0);
+      _go_readonly(EIO);
     }
     dout(20) << " executing item (0x" << std::hex << item.ino
              << std::dec << ")" << dendl;
@@ -419,7 +457,8 @@ void PurgeQueue::_execute_item(
 
   in_flight[expire_to] = item;
   logger->set(l_pq_executing, in_flight.size());
-  ops_in_flight += _calculate_ops(item);
+  auto ops = _calculate_ops(item);
+  ops_in_flight += ops;
   logger->set(l_pq_executing_ops, ops_in_flight);
 
   SnapContext nullsnapc;
@@ -486,6 +525,8 @@ void PurgeQueue::_execute_item(
   } else {
     derr << "Invalid item (action=" << item.action << ") in purge queue, "
             "dropping it" << dendl;
+    ops_in_flight -= ops;
+    logger->set(l_pq_executing_ops, ops_in_flight);
     in_flight.erase(expire_to);
     logger->set(l_pq_executing, in_flight.size());
     return;
@@ -505,9 +546,7 @@ void PurgeQueue::_execute_item(
     // expire_pos doesn't fall too far behind our progress when consuming
     // a very long queue.
     if (in_flight.empty() || journaler.write_head_needed()) {
-      journaler.write_head(new FunctionContext([this](int r){
-            journaler.trim();
-            }));
+      journaler.write_head(nullptr);
     }
   }), &finisher));
 
@@ -551,6 +590,11 @@ void PurgeQueue::update_op_limit(const MDSMap &mds_map)
 {
   Mutex::Locker l(lock);
 
+  if (readonly) {
+    dout(10) << "skipping; PurgeQueue is readonly" << dendl;
+    return;
+  }
+
   uint64_t pg_count = 0;
   objecter->with_osdmap([&](const OSDMap& o) {
     // Number of PGs across all data pools
@@ -607,6 +651,13 @@ bool PurgeQueue::drain(
     size_t *in_flight_count
     )
 {
+  Mutex::Locker l(lock);
+
+  if (readonly) {
+    dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
+    return true;
+  }
+
   assert(progress != nullptr);
   assert(progress_total != nullptr);
   assert(in_flight_count != nullptr);