]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/PurgeQueue.cc
import 15.2.4
[ceph.git] / ceph / src / mds / PurgeQueue.cc
index 3e992c396d06da75337cfddecff1b71fa33b348d..eace818ec3d8fec63854d175d8df526c5f05ac93 100644 (file)
@@ -57,15 +57,39 @@ void PurgeItem::encode(bufferlist &bl) const
 void PurgeItem::decode(bufferlist::const_iterator &p)
 {
   DECODE_START(2, p);
-  decode((uint8_t&)action, p);
-  decode(ino, p);
-  decode(size, p);
-  decode(layout, p);
-  decode(old_pools, p);
-  decode(snapc, p);
-  decode(fragtree, p);
-  if (struct_v >= 2) {
-    decode(stamp, p);
+  bool done = false;
+  if (struct_v == 1) {
+    auto p_start = p;
+    try {
+      // bad encoding introduced by v13.2.2
+      decode(stamp, p);
+      decode(pad_size, p);
+      p += pad_size;
+      decode((uint8_t&)action, p);
+      decode(ino, p);
+      decode(size, p);
+      decode(layout, p);
+      decode(old_pools, p);
+      decode(snapc, p);
+      decode(fragtree, p);
+      if (p.get_off() > struct_end)
+       throw buffer::end_of_buffer();
+      done = true;
+    } catch (const buffer::error &e) {
+      p = p_start;
+    }
+  }
+  if (!done) {
+    decode((uint8_t&)action, p);
+    decode(ino, p);
+    decode(size, p);
+    decode(layout, p);
+    decode(old_pools, p);
+    decode(snapc, p);
+    decode(fragtree, p);
+    if (struct_v >= 2) {
+      decode(stamp, p);
+    }
   }
   DECODE_FINISH(p);
 }
@@ -81,7 +105,6 @@ PurgeQueue::PurgeQueue(
   :
     cct(cct_),
     rank(rank_),
-    lock("PurgeQueue"),
     metadata_pool(metadata_pool_),
     finisher(cct, "PurgeQueue", "PQ_Finisher"),
     timer(cct, lock),
@@ -90,13 +113,7 @@ PurgeQueue::PurgeQueue(
     journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
       CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
       &finisher),
-    on_error(on_error_),
-    ops_in_flight(0),
-    max_purge_ops(0),
-    drain_initial(0),
-    draining(false),
-    delayed_flush(nullptr),
-    recovered(false)
+    on_error(on_error_)
 {
   ceph_assert(cct != nullptr);
   ceph_assert(on_error != nullptr);
@@ -124,6 +141,7 @@ void PurgeQueue::create_logger()
   pcb.add_u64(l_pq_executing_ops_high_water, "pq_executing_ops_high_water", "Maximum number of executing file purge ops");
   pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
   pcb.add_u64(l_pq_executing_high_water, "pq_executing_high_water", "Maximum number of executing file purges");
+  pcb.add_u64(l_pq_item_in_journal, "pq_item_in_journal", "Purge item left in journal");
 
   logger.reset(pcb.create_perf_counters());
   g_ceph_context->get_perfcounters_collection()->add(logger.get());
@@ -143,6 +161,16 @@ void PurgeQueue::activate()
 {
   std::lock_guard l(lock);
 
+  {
+    PurgeItem item;
+    bufferlist bl;
+
+    // calculate purge item serialized size stored in journal
+    // used to count how many items still left in journal later
+    ::encode(item, bl);
+    purge_item_journal_size = bl.length() + journaler.get_journal_envelope_size(); 
+  }
+
   if (readonly) {
     dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
     return;
@@ -153,7 +181,7 @@ void PurgeQueue::activate()
 
   if (in_flight.empty()) {
     dout(4) << "start work (by drain)" << dendl;
-    finisher.queue(new FunctionContext([this](int r) {
+    finisher.queue(new LambdaContext([this](int r) {
          std::lock_guard l(lock);
          _consume();
          }));
@@ -178,7 +206,7 @@ void PurgeQueue::open(Context *completion)
   if (completion)
     waiting_for_recovery.push_back(completion);
 
-  journaler.recover(new FunctionContext([this](int r){
+  journaler.recover(new LambdaContext([this](int r){
     if (r == -ENOENT) {
       dout(1) << "Purge Queue not found, assuming this is an upgrade and "
                  "creating it." << dendl;
@@ -222,14 +250,14 @@ void PurgeQueue::wait_for_recovery(Context* c)
 
 void PurgeQueue::_recover()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   // Journaler::is_readable() adjusts write_pos if partial entry is encountered
   while (1) {
     if (!journaler.is_readable() &&
        !journaler.get_error() &&
        journaler.get_read_pos() < journaler.get_write_pos()) {
-      journaler.wait_for_readable(new FunctionContext([this](int r) {
+      journaler.wait_for_readable(new LambdaContext([this](int r) {
         std::lock_guard l(lock);
        _recover();
       }));
@@ -271,7 +299,7 @@ void PurgeQueue::create(Context *fin)
   layout.pool_id = metadata_pool;
   journaler.set_writeable();
   journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
-  journaler.write_head(new FunctionContext([this](int r) {
+  journaler.write_head(new LambdaContext([this](int r) {
     std::lock_guard l(lock);
     if (r) {
       _go_readonly(r);
@@ -314,7 +342,7 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion)
     // we should flush in order to allow MDCache to drop its strays rather
     // than having them wait for purgequeue to progress.
     if (!delayed_flush) {
-      delayed_flush = new FunctionContext([this](int r){
+      delayed_flush = new LambdaContext([this](int r){
             delayed_flush = nullptr;
             journaler.flush();
           });
@@ -395,7 +423,7 @@ void PurgeQueue::_go_readonly(int r)
   if (readonly) return;
   dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
   readonly = true;
-  on_error->complete(r);
+  finisher.queue(on_error, r);
   on_error = nullptr;
   journaler.set_readonly();
   finish_contexts(g_ceph_context, waiting_for_recovery, r);
@@ -403,7 +431,7 @@ void PurgeQueue::_go_readonly(int r)
 
 bool PurgeQueue::_consume()
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   bool could_consume = false;
   while(_can_consume()) {
@@ -427,7 +455,7 @@ bool PurgeQueue::_consume()
       // Because we are the writer and the reader of the journal
       // via the same Journaler instance, we never need to reread_head
       if (!journaler.have_waiter()) {
-        journaler.wait_for_readable(new FunctionContext([this](int r) {
+        journaler.wait_for_readable(new LambdaContext([this](int r) {
           std::lock_guard l(lock);
           if (r == 0) {
             _consume();
@@ -469,7 +497,7 @@ void PurgeQueue::_execute_item(
     const PurgeItem &item,
     uint64_t expire_to)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   in_flight[expire_to] = item;
   logger->set(l_pq_executing, in_flight.size());
@@ -558,10 +586,16 @@ void PurgeQueue::_execute_item(
   ceph_assert(gather.has_subs());
 
   gather.set_finisher(new C_OnFinisher(
-                      new FunctionContext([this, expire_to](int r){
+                      new LambdaContext([this, expire_to](int r){
     std::lock_guard l(lock);
-    _execute_item_complete(expire_to);
 
+    if (r == -EBLACKLISTED) {
+      finisher.queue(on_error, r);
+      on_error = nullptr;
+      return;
+    }
+
+    _execute_item_complete(expire_to);
     _consume();
 
     // Have we gone idle?  If so, do an extra write_head now instead of
@@ -569,7 +603,8 @@ void PurgeQueue::_execute_item(
     // Also do this periodically even if not idle, so that the persisted
     // expire_pos doesn't fall too far behind our progress when consuming
     // a very long queue.
-    if (in_flight.empty() || journaler.write_head_needed()) {
+    if (!readonly &&
+       (in_flight.empty() || journaler.write_head_needed())) {
       journaler.write_head(nullptr);
     }
   }), &finisher));
@@ -580,7 +615,7 @@ void PurgeQueue::_execute_item(
 void PurgeQueue::_execute_item_complete(
     uint64_t expire_to)
 {
-  ceph_assert(lock.is_locked_by_me());
+  ceph_assert(ceph_mutex_is_locked_by_me(lock));
   dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
   ceph_assert(in_flight.count(expire_to) == 1);
 
@@ -626,6 +661,17 @@ void PurgeQueue::_execute_item_complete(
   logger->set(l_pq_executing_high_water, files_high_water);
   dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
 
+  uint64_t write_pos = journaler.get_write_pos(); 
+  uint64_t read_pos = journaler.get_read_pos(); 
+  uint64_t expire_pos = journaler.get_expire_pos(); 
+  uint64_t item_num = (write_pos - (in_flight.size() ? expire_pos : read_pos)) 
+                     / purge_item_journal_size;
+  dout(10) << "left purge items in journal: " << item_num 
+    << " (purge_item_journal_size/write_pos/read_pos/expire_pos) now at " 
+    << "(" << purge_item_journal_size << "/" << write_pos << "/" << read_pos 
+    << "/" << expire_pos << ")" << dendl;
+
+  logger->set(l_pq_item_in_journal, item_num);
   logger->inc(l_pq_executed);
 }
 
@@ -677,7 +723,7 @@ void PurgeQueue::handle_conf_change(const std::set<std::string>& changed, const
       // might need to kick off consume.
       dout(4) << "maybe start work again (max_purge_files="
               << g_conf()->mds_max_purge_files << dendl;
-      finisher.queue(new FunctionContext([this](int r){
+      finisher.queue(new LambdaContext([this](int r){
         std::lock_guard l(lock);
         _consume();
       }));