]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_gc.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / rgw / rgw_gc.cc
index 8e6137e9676f92a48d56bd3cdabbb0433020a352..2267daa40cc55a1e9292b63876157f2f4c700338 100644 (file)
@@ -1,14 +1,21 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "rgw_gc.h"
+
+#include "rgw_tools.h"
+#include "include/scope_guard.h"
 #include "include/rados/librados.hpp"
 #include "cls/rgw/cls_rgw_client.h"
+#include "cls/rgw_gc/cls_rgw_gc_client.h"
 #include "cls/refcount/cls_refcount_client.h"
+#include "cls/version/cls_version_client.h"
+#include "rgw_perf_counters.h"
 #include "cls/lock/cls_lock_client.h"
 #include "include/random.h"
+#include "rgw_gc_log.h"
 
-#include <list>
+#include <list> // XXX
 #include <sstream>
 
 #define dout_context g_ceph_context
@@ -19,7 +26,6 @@ using namespace librados;
 static string gc_oid_prefix = "gc";
 static string gc_index_lock_name = "gc_process";
 
-
 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
   cct = _cct;
   store = _store;
@@ -33,6 +39,17 @@ void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
     char buf[32];
     snprintf(buf, 32, ".%d", i);
     obj_names[i].append(buf);
+
+    auto it = transitioned_objects_cache.begin() + i;
+    transitioned_objects_cache.insert(it, false);
+
+    //version = 0 -> not ready for transition
+    //version = 1 -> marked ready for transition
+    librados::ObjectWriteOperation op;
+    op.create(false);
+    const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
+    gc_log_init2(op, queue_size, num_deferred_entries);
+    store->gc_operate(obj_names[i], &op);
   }
 }
 
@@ -46,74 +63,212 @@ int RGWGC::tag_index(const string& tag)
   return rgw_shard_id(tag, max_objs);
 }
 
-void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag)
+int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag)
 {
+  ObjectWriteOperation op;
   cls_rgw_gc_obj_info info;
   info.chain = chain;
   info.tag = tag;
+  gc_log_enqueue2(op, cct->_conf->rgw_gc_obj_min_wait, info);
+
+  int i = tag_index(tag);
+
+  ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
+
+  auto ret = store->gc_operate(obj_names[i], &op);
+  if (ret != -ECANCELED && ret != -EPERM) {
+    return ret;
+  }
+  ObjectWriteOperation set_entry_op;
+  cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
+  return store->gc_operate(obj_names[i], &set_entry_op);
+}
+
+struct defer_chain_state {
+  librados::AioCompletion* completion = nullptr;
+  // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
+  // RGWGC destructs before this completion fires
+  RGWGC* gc = nullptr;
+  cls_rgw_gc_obj_info info;
+
+  ~defer_chain_state() {
+    if (completion) {
+      completion->release();
+    }
+  }
+};
 
-  cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
+static void async_defer_callback(librados::completion_t, void* arg)
+{
+  std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
+  if (state->completion->get_return_value() == -ECANCELED) {
+    state->gc->on_defer_canceled(state->info);
+  }
 }
 
-int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
 {
-  ObjectWriteOperation op;
-  add_chain(op, chain, tag);
+  const std::string& tag = info.tag;
+  const int i = tag_index(tag);
 
-  int i = tag_index(tag);
+  // ECANCELED from cls_version_check() tells us that we've transitioned
+  transitioned_objects_cache[i] = true;
 
-  if (sync)
-    return store->gc_operate(obj_names[i], &op);
+  ObjectWriteOperation op;
+  cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
+  cls_rgw_gc_remove(op, {tag});
 
-  return store->gc_aio_operate(obj_names[i], &op);
+  auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
+  store->gc_aio_operate(obj_names[i], c, &op);
+  c->release();
 }
 
-int RGWGC::defer_chain(const string& tag, bool sync)
+int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
 {
-  ObjectWriteOperation op;
-  cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
+  const int i = tag_index(tag);
+  cls_rgw_gc_obj_info info;
+  info.chain = chain;
+  info.tag = tag;
 
-  int i = tag_index(tag);
+  // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
+  if (transitioned_objects_cache[i]) {
+    ObjectWriteOperation op;
+    cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
 
-  if (sync)
-    return store->gc_operate(obj_names[i], &op);
+    // this tag may still be present in omap, so remove it once the cls_rgw_gc
+    // enqueue succeeds
+    cls_rgw_gc_remove(op, {tag});
+
+    auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
+    int ret = store->gc_aio_operate(obj_names[i], c, &op);
+    c->release();
+    return ret;
+  }
 
-  return store->gc_aio_operate(obj_names[i], &op);
+  // if we haven't seen the transition yet, write the defer to omap with cls_rgw
+  ObjectWriteOperation op;
+
+  // assert that we haven't initialized cls_rgw_gc queue. this prevents us
+  // from writing new entries to omap after the transition
+  gc_log_defer1(op, cct->_conf->rgw_gc_obj_min_wait, info);
+
+  // prepare a callback to detect the transition via ECANCELED from cls_version_check()
+  auto state = std::make_unique<defer_chain_state>();
+  state->gc = this;
+  state->info.chain = chain;
+  state->info.tag = tag;
+  state->completion = librados::Rados::aio_create_completion(
+      state.get(), async_defer_callback);
+
+  int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
+  if (ret == 0) {
+    state.release(); // release ownership until async_defer_callback()
+  }
+  return ret;
 }
 
 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
 {
   ObjectWriteOperation op;
   cls_rgw_gc_remove(op, tags);
-  return store->gc_aio_operate(obj_names[index], &op, pc);
+
+  auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
+  int ret = store->gc_aio_operate(obj_names[index], c, &op);
+  if (ret < 0) {
+    c->release();
+  } else {
+    *pc = c;
+  }
+  return ret;
 }
 
-int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
+int RGWGC::remove(int index, int num_entries)
+{
+  ObjectWriteOperation op;
+  cls_rgw_gc_queue_remove_entries(op, num_entries);
+
+  return store->gc_operate(obj_names[index], &op);
+}
+
+int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
 {
   result.clear();
   string next_marker;
+  bool check_queue = false;
 
-  for (; *index < max_objs && result.size() < max; (*index)++, marker.clear()) {
-    std::list<cls_rgw_gc_obj_info> entries;
-    int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
-    if (ret == -ENOENT)
+  for (; *index < max_objs && result.size() < max; (*index)++, marker.clear(), check_queue = false) {
+    std::list<cls_rgw_gc_obj_info> entries, queue_entries;
+    int ret = 0;
+
+    //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
+    if (! transitioned_objects_cache[*index] && ! check_queue && ! processing_queue) {
+      ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
+      if (ret != -ENOENT && ret < 0) {
+        return ret;
+      }
+      obj_version objv;
+      cls_version_read(store->gc_pool_ctx, obj_names[*index], &objv);
+      if (ret == -ENOENT || entries.size() == 0) {
+        if (objv.ver == 0) {
+          continue;
+        } else {
+          if (! expired_only) {
+            transitioned_objects_cache[*index] = true;
+            marker.clear();
+          } else {
+            std::list<cls_rgw_gc_obj_info> non_expired_entries;
+            ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, 1, false, non_expired_entries, truncated, next_marker);
+            if (non_expired_entries.size() == 0) {
+              transitioned_objects_cache[*index] = true;
+              marker.clear();
+            }
+          }
+        }
+      }
+      if ((objv.ver == 1) && (entries.size() < max - result.size())) {
+        check_queue = true;
+        marker.clear();
+      }
+    }
+    if (transitioned_objects_cache[*index] || check_queue || processing_queue) {
+      processing_queue = false;
+      ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[*index], marker, (max - result.size()) - entries.size(), expired_only, queue_entries, truncated, next_marker);
+      if (ret < 0) {
+        return ret;
+      }
+    }
+    if (entries.size() == 0 && queue_entries.size() == 0)
       continue;
-    if (ret < 0)
-      return ret;
 
     std::list<cls_rgw_gc_obj_info>::iterator iter;
     for (iter = entries.begin(); iter != entries.end(); ++iter) {
       result.push_back(*iter);
     }
 
+    for (iter = queue_entries.begin(); iter != queue_entries.end(); ++iter) {
+      result.push_back(*iter);
+    }
+
     marker = next_marker;
 
     if (*index == max_objs - 1) {
+      if (queue_entries.size() > 0 && *truncated) {
+        processing_queue = true;
+      } else {
+        processing_queue = false;
+      }
       /* we cut short here, truncated will hold the correct value */
       return 0;
     }
 
     if (result.size() == max) {
+      if (queue_entries.size() > 0 && *truncated) {
+        processing_queue = true;
+      } else {
+        processing_queue = false;
+        *index += 1; //move to next gc object
+      }
+
       /* close approximation, it might be that the next of the objects don't hold
        * anything, in this case truncated should have been false, but we can find
        * that out on the next iteration
@@ -121,9 +276,9 @@ int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std
       *truncated = true;
       return 0;
     }
-
   }
   *truncated = false;
+  processing_queue = false;
 
   return 0;
 }
@@ -147,6 +302,10 @@ class RGWGCIOManager {
 
   deque<IO> ios;
   vector<std::vector<string> > remove_tags;
+  /* tracks the number of remaining shadow objects for a given tag in order to
+   * only remove the tag once all shadow objects have themselves been removed
+   */
+  vector<map<string, size_t> > tag_io_size;
 
 #define MAX_AIO_DEFAULT 10
   size_t max_aio{MAX_AIO_DEFAULT};
@@ -155,7 +314,8 @@ public:
   RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
                                                   cct(_cct),
                                                   gc(_gc),
-                                                  remove_tags(cct->_conf->rgw_gc_max_objs) {
+                                                  remove_tags(cct->_conf->rgw_gc_max_objs),
+                                                  tag_io_size(cct->_conf->rgw_gc_max_objs) {
     max_aio = cct->_conf->rgw_gc_max_concurrent_io;
   }
 
@@ -171,10 +331,14 @@ public:
       if (gc->going_down()) {
         return 0;
       }
-      handle_next_completion();
+      auto ret = handle_next_completion();
+      //Return error if we are using queue, else ignore it
+      if (gc->transitioned_objects_cache[index] && ret < 0) {
+        return ret;
+      }
     }
 
-    AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+    auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
     int ret = ioctx->aio_operate(oid, c, op);
     if (ret < 0) {
       return ret;
@@ -184,10 +348,10 @@ public:
     return 0;
   }
 
-  void handle_next_completion() {
+  int handle_next_completion() {
     ceph_assert(!ios.empty());
     IO& io = ios.front();
-    io.c->wait_for_safe();
+    io.c->wait_for_complete();
     int ret = io.c->get_return_value();
     io.c->release();
 
@@ -195,7 +359,7 @@ public:
       ret = 0;
     }
 
-    if (io.type == IO::IndexIO) {
+    if (io.type == IO::IndexIO && ! gc->transitioned_objects_cache[io.index]) {
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
          io.index << " returned error, ret=" << ret << dendl;
@@ -209,33 +373,59 @@ public:
       goto done;
     }
 
-    schedule_tag_removal(io.index, io.tag);
+    if (! gc->transitioned_objects_cache[io.index]) {
+      schedule_tag_removal(io.index, io.tag);
+    }
 
   done:
     ios.pop_front();
+    return ret;
   }
 
+  /* This is a request to schedule a tag removal. It will be called once when
+   * there are no shadow objects. But it will also be called for every shadow
+   * object when there are any. Since we do not want the tag to be removed
+   * until all shadow objects have been successfully removed, the scheduling
+   * will not happen until the shadow object count goes down to zero
+   */
   void schedule_tag_removal(int index, string tag) {
+    auto& ts = tag_io_size[index];
+    auto ts_it = ts.find(tag);
+    if (ts_it != ts.end()) {
+      auto& size = ts_it->second;
+      --size;
+      // wait all shadow obj delete return
+      if (size != 0)
+        return;
+
+      ts.erase(ts_it);
+    }
+
     auto& rt = remove_tags[index];
 
-    // since every element of a chain tries to add the same tag, and
-    // since chains are handled sequentially, check to make sure it's
-    // not already on the list
-    if (rt.empty() || rt.back() != tag) {
-      rt.push_back(tag);
-      if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
-       flush_remove_tags(index, rt);
-      }
+    rt.push_back(tag);
+    if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
+      flush_remove_tags(index, rt);
     }
   }
 
-  void drain_ios() {
+  void add_tag_io_size(int index, string tag, size_t size) {
+    auto& ts = tag_io_size[index];
+    ts.emplace(tag, size);
+  }
+
+  int drain_ios() {
+    int ret_val = 0;
     while (!ios.empty()) {
       if (gc->going_down()) {
-        return;
+        return -EAGAIN;
+      }
+      auto ret = handle_next_completion();
+      if (ret < 0) {
+        ret_val = ret;
       }
-      handle_next_completion();
     }
+    return ret_val;
   }
 
   void drain() {
@@ -250,30 +440,18 @@ public:
     index_io.type = IO::IndexIO;
     index_io.index = index;
 
-    // use lambda to assemble list, so it will only get executed if
-    // we're at the appropirate logging level
-    auto lister = [&rt]() -> std::string {
-      std::stringstream out;
-      bool first = true;
-
-      for (const auto& s : rt) {
-       if (first) {
-         first = false;
-       } else {
-         out << ", ";
-       }
-       out << s;
-      }
-
-      return out.str();
-    };
-
     ldpp_dout(dpp, 20) << __func__ <<
       " removing entries from gc log shard index=" << index << ", size=" <<
-      rt.size() << ", entries=[" << lister() << "]" << dendl;
+      rt.size() << ", entries=" << rt << dendl;
+
+    auto rt_guard = make_scope_guard(
+      [&]
+       {
+         rt.clear();
+       }
+      );
 
     int ret = gc->remove(index, rt, &index_io.c);
-    rt.clear();
     if (ret < 0) {
       /* we already cleared list of tags, this prevents us from
        * ballooning in case of a persistent problem
@@ -282,17 +460,36 @@ public:
        index << " ret=" << ret << dendl;
       return;
     }
-
+    if (perfcounter) {
+      /* log the count of tags retired for rate estimation */
+      perfcounter->inc(l_rgw_gc_retire, rt.size());
+    }
     ios.push_back(index_io);
   }
 
   void flush_remove_tags() {
     int index = 0;
     for (auto& rt : remove_tags) {
-      flush_remove_tags(index, rt);
+      if (! gc->transitioned_objects_cache[index]) {
+        flush_remove_tags(index, rt);
+      }
       ++index;
     }
   }
+
+  int remove_queue_entries(int index, int num_entries) {
+    int ret = gc->remove(index, num_entries);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
+           index << " ret=" << ret << dendl;
+      return ret;
+    }
+    if (perfcounter) {
+      /* log the count of tags retired for rate estimation */
+      perfcounter->inc(l_rgw_gc_retire, num_entries);
+    }
+    return 0;
+  }
 }; // class RGWGCIOManger
 
 int RGWGC::process(int index, int max_secs, bool expired_only,
@@ -333,17 +530,46 @@ int RGWGC::process(int index, int max_secs, bool expired_only,
     int max = 100;
     std::list<cls_rgw_gc_obj_info> entries;
 
-    ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max,
-                         expired_only, entries, &truncated, next_marker);
-    ldpp_dout(this, 20) <<
+    int ret = 0;
+
+    if (! transitioned_objects_cache[index]) {
+      ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
+      ldpp_dout(this, 20) <<
       "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
       ", entries.size=" << entries.size() << ", truncated=" << truncated <<
       ", next_marker='" << next_marker << "'" << dendl;
+      obj_version objv;
+      cls_version_read(store->gc_pool_ctx, obj_names[index], &objv);
+      if ((objv.ver == 1) && entries.size() == 0) {
+        std::list<cls_rgw_gc_obj_info> non_expired_entries;
+        ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, 1, false, non_expired_entries, &truncated, next_marker);
+        if (non_expired_entries.size() == 0) {
+          transitioned_objects_cache[index] = true;
+          marker.clear();
+          ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl;
+        } else {
+          ret = 0;
+          goto done;
+        }
+      }
+      if ((objv.ver == 0) && (ret == -ENOENT || entries.size() == 0)) {
+        ret = 0;
+        goto done;
+      }
+    }
 
-    if (ret == -ENOENT) {
-      ret = 0;
-      goto done;
+    if (transitioned_objects_cache[index]) {
+      ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
+      ldpp_dout(this, 20) <<
+      "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret <<
+      ", entries.size=" << entries.size() << ", truncated=" << truncated <<
+      ", next_marker='" << next_marker << "'" << dendl;
+      if (entries.size() == 0) {
+        ret = 0;
+        goto done;
+      }
     }
+
     if (ret < 0)
       goto done;
 
@@ -365,10 +591,14 @@ int RGWGC::process(int index, int max_secs, bool expired_only,
       if (now >= end) {
         goto done;
       }
-
-      if (chain.objs.empty()) {
-        io_manager.schedule_tag_removal(index, info.tag);
-      } else {
+      if (! transitioned_objects_cache[index]) {
+        if (chain.objs.empty()) {
+          io_manager.schedule_tag_removal(index, info.tag);
+        } else {
+          io_manager.add_tag_io_size(index, info.tag, chain.objs.size());
+        }
+      }
+      if (! chain.objs.empty()) {
        for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
          cls_rgw_obj& obj = *liter;
 
@@ -377,6 +607,9 @@ int RGWGC::process(int index, int max_secs, bool expired_only,
            ctx = new IoCtx;
            ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
            if (ret < 0) {
+        if (transitioned_objects_cache[index]) {
+          goto done;
+        }
              last_pool = "";
              ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
                obj.pool << dendl;
@@ -398,6 +631,10 @@ int RGWGC::process(int index, int max_secs, bool expired_only,
          if (ret < 0) {
            ldpp_dout(this, 0) <<
              "WARNING: failed to schedule deletion for oid=" << oid << dendl;
+      if (transitioned_objects_cache[index]) {
+        //If deleting oid failed for any of them, we will not delete queue entries
+        goto done;
+      }
          }
          if (going_down()) {
            // leave early, even if tag isn't removed, it's ok since it
@@ -407,6 +644,20 @@ int RGWGC::process(int index, int max_secs, bool expired_only,
        } // chains loop
       } // else -- chains not empty
     } // entries loop
+    if (transitioned_objects_cache[index] && entries.size() > 0) {
+      ret = io_manager.drain_ios();
+      if (ret < 0) {
+        goto done;
+      }
+      //Remove the entries from the queue
+      ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
+      ret = io_manager.remove_queue_entries(index, entries.size());
+      if (ret < 0) {
+        ldpp_dout(this, 0) <<
+          "WARNING: failed to remove queue entries" << dendl;
+        goto done;
+      }
+    }
   } while (truncated);
 
 done:
@@ -494,9 +745,8 @@ void *RGWGC::GCWorker::entry() {
 
     secs -= end.sec();
 
-    lock.Lock();
-    cond.WaitInterval(lock, utime_t(secs, 0));
-    lock.Unlock();
+    std::unique_lock locker{lock};
+    cond.wait_for(locker, std::chrono::seconds(secs));
   } while (!gc->going_down());
 
   return NULL;
@@ -504,6 +754,6 @@ void *RGWGC::GCWorker::entry() {
 
 void RGWGC::GCWorker::stop()
 {
-  Mutex::Locker l(lock);
-  cond.Signal();
+  std::lock_guard l{lock};
+  cond.notify_all();
 }