]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_object_expirer_core.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_object_expirer_core.cc
index b2e302baab67926b50dd5715f9258100e6873fd3..370001bff993999c816a2081a8feb9d53f5c8a8d 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <errno.h>
 #include <iostream>
 
 #include "rgw_user.h"
 #include "rgw_bucket.h"
-#include "rgw_rados.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h"
 #include "rgw_log.h"
 #include "rgw_formats.h"
 #include "rgw_usage.h"
 #include "rgw_object_expirer_core.h"
+#include "rgw_zone.h"
+#include "rgw_sal_rados.h"
 
+#include "services/svc_rados.h"
+#include "services/svc_zone.h"
 #include "services/svc_sys_obj.h"
+#include "services/svc_bi_rados.h"
 
 #include "cls/lock/cls_lock_client.h"
+#include "cls/timeindex/cls_timeindex_client.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+using namespace std;
+
 static string objexp_lock_name = "gc_process";
 
-int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
-                                       const string& bucket_name,
-                                       const string& bucket_id,
-                                       RGWBucketInfo& bucket_info)
+static string objexp_hint_get_shardname(int shard_num)
 {
-  auto obj_ctx = store->svc.sysobj->init_obj_ctx();
+  char buf[64];
+  snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num);
+  return buf;
+}
 
-  /*
-   * XXX Here's where it gets tricky. We went to all the trouble of
-   * punching the tenant through the objexp_hint_entry, but now we
-   * find that our instances do not actually have tenants. They are
-   * unique thanks to IDs. So the tenant string is not needed...
+static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards)
+{
+  string obj_key = key.name + key.instance;
+  return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key, num_shards);
+}
 
-   * XXX reloaded: it turns out tenants were needed after all since bucket ids
-   * are ephemeral, good call encoding tenant info!
-   */
+static string objexp_hint_get_keyext(const string& tenant_name,
+                                     const string& bucket_name,
+                                     const string& bucket_id,
+                                     const rgw_obj_key& obj_key) {
+  return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
+    ":" + obj_key.name + ":" + obj_key.instance;
+}
 
-  return store->get_bucket_info(obj_ctx, tenant_name, bucket_name,
-                               bucket_info, nullptr, nullptr);
+static void objexp_get_shard(int shard_num,
+                             string *shard)
+{
+  *shard = objexp_hint_get_shardname(shard_num);
+}
 
+static int objexp_hint_parse(const DoutPrefixProvider *dpp, CephContext *cct, cls_timeindex_entry &ti_entry,
+                             objexp_hint_entry *hint_entry)
+{
+  try {
+    auto iter = ti_entry.value.cbegin();
+    decode(*hint_entry, iter);
+  } catch (buffer::error& err) {
+    ldpp_dout(dpp, 0) << "ERROR: couldn't decode avail_pools" << dendl;
+  }
+
+  return 0;
 }
 
-int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
+int RGWObjExpStore::objexp_hint_add(const DoutPrefixProvider *dpp, 
+                              const ceph::real_time& delete_at,
+                              const string& tenant_name,
+                              const string& bucket_name,
+                              const string& bucket_id,
+                              const rgw_obj_index_key& obj_key)
+{
+  const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
+          bucket_id, obj_key);
+  objexp_hint_entry he = {
+      .tenant = tenant_name,
+      .bucket_name = bucket_name,
+      .bucket_id = bucket_id,
+      .obj_key = obj_key,
+      .exp_time = delete_at };
+  bufferlist hebl;
+  encode(he, hebl);
+  librados::ObjectWriteOperation op;
+  cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
+
+  string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards));
+  auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, shard_name));
+  int r = obj.open(dpp);
+  if (r < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  return obj.operate(dpp, &op, null_yield);
+}
+
+int RGWObjExpStore::objexp_hint_list(const DoutPrefixProvider *dpp, 
+                               const string& oid,
+                               const ceph::real_time& start_time,
+                               const ceph::real_time& end_time,
+                               const int max_entries,
+                               const string& marker,
+                               list<cls_timeindex_entry>& entries, /* out */
+                               string *out_marker,                 /* out */
+                               bool *truncated)                    /* out */
+{
+  librados::ObjectReadOperation op;
+  cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
+        out_marker, truncated);
+
+  auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, oid));
+  int r = obj.open(dpp);
+  if (r < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  bufferlist obl;
+  int ret = obj.operate(dpp, &op, &obl, null_yield);
+
+  if ((ret < 0 ) && (ret != -ENOENT)) {
+    return ret;
+  }
+
+  if ((ret == -ENOENT) && truncated) {
+    *truncated = false;
+  }
+
+  return 0;
+}
+
+static int cls_timeindex_trim_repeat(const DoutPrefixProvider *dpp, 
+                                rgw_rados_ref ref,
+                                const string& oid,
+                                const utime_t& from_time,
+                                const utime_t& to_time,
+                                const string& from_marker,
+                                const string& to_marker)
+{
+  bool done = false;
+  do {
+    librados::ObjectWriteOperation op;
+    cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker);
+    int r = rgw_rados_operate(dpp, ref.pool.ioctx(), oid, &op, null_yield);
+    if (r == -ENODATA)
+      done = true;
+    else if (r < 0)
+      return r;
+  } while (!done);
+
+  return 0;
+}
+
+int RGWObjExpStore::objexp_hint_trim(const DoutPrefixProvider *dpp, 
+                               const string& oid,
+                               const ceph::real_time& start_time,
+                               const ceph::real_time& end_time,
+                               const string& from_marker,
+                               const string& to_marker)
+{
+  auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, oid));
+  int r = obj.open(dpp);
+  if (r < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  auto& ref = obj.get_ref();
+  int ret = cls_timeindex_trim_repeat(dpp, ref, oid, utime_t(start_time), utime_t(end_time),
+          from_marker, to_marker);
+  if ((ret < 0 ) && (ret != -ENOENT)) {
+    return ret;
+  }
+
+  return 0;
+}
+
+int RGWObjectExpirer::garbage_single_object(const DoutPrefixProvider *dpp, objexp_hint_entry& hint)
 {
   RGWBucketInfo bucket_info;
+  std::unique_ptr<rgw::sal::Bucket> bucket;
 
-  int ret = init_bucket_info(hint.tenant, hint.bucket_name,
-          hint.bucket_id, bucket_info);
+  int ret = store->get_bucket(dpp, nullptr, rgw_bucket(hint.tenant, hint.bucket_name, hint.bucket_id), &bucket, null_yield);
   if (-ENOENT == ret) {
-    ldout(store->ctx(), 15) << "NOTICE: cannot find bucket = " \
+    ldpp_dout(dpp, 15) << "NOTICE: cannot find bucket = " \
         << hint.bucket_name << ". The object must be already removed" << dendl;
     return -ERR_PRECONDITION_FAILED;
   } else if (ret < 0) {
-    ldout(store->ctx(),  1) << "ERROR: could not init bucket = " \
+    ldpp_dout(dpp, 1) << "ERROR: could not init bucket = " \
         << hint.bucket_name << "due to ret = " << ret << dendl;
     return ret;
   }
@@ -85,15 +219,15 @@ int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
     key.instance = "null";
   }
 
-  rgw_obj obj(bucket_info.bucket, key);
-  store->set_atomic(&rctx, obj);
-  ret = store->delete_obj(rctx, bucket_info, obj,
-          bucket_info.versioning_status(), 0, hint.exp_time);
+  std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(key);
+  obj->set_atomic(&rctx);
+  ret = obj->delete_object(dpp, &rctx, null_yield);
 
   return ret;
 }
 
-void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /* in  */
+void RGWObjectExpirer::garbage_chunk(const DoutPrefixProvider *dpp, 
+                                  list<cls_timeindex_entry>& entries,      /* in  */
                                   bool& need_trim)                         /* out */
 {
   need_trim = false;
@@ -103,22 +237,22 @@ void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /*
        ++iter)
   {
     objexp_hint_entry hint;
-    ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
+    ldpp_dout(dpp, 15) << "got removal hint for: " << iter->key_ts.sec() \
         << " - " << iter->key_ext << dendl;
 
-    int ret = store->objexp_hint_parse(*iter, hint);
+    int ret = objexp_hint_parse(dpp, store->ctx(), *iter, &hint);
     if (ret < 0) {
-      ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
+      ldpp_dout(dpp, 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
       continue;
     }
 
     /* PRECOND_FAILED simply means that our hint is not valid.
      * We can silently ignore that and move forward. */
-    ret = garbage_single_object(hint);
+    ret = garbage_single_object(dpp, hint);
     if (ret == -ERR_PRECONDITION_FAILED) {
-      ldout(store->ctx(), 15) << "not actual hint for object: " << hint.obj_key << dendl;
+      ldpp_dout(dpp, 15) << "not actual hint for object: " << hint.obj_key << dendl;
     } else if (ret < 0) {
-      ldout(store->ctx(), 1) << "cannot remove expired object: " << hint.obj_key << dendl;
+      ldpp_dout(dpp, 1) << "cannot remove expired object: " << hint.obj_key << dendl;
     }
 
     need_trim = true;
@@ -127,28 +261,30 @@ void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /*
   return;
 }
 
-void RGWObjectExpirer::trim_chunk(const string& shard,
+void RGWObjectExpirer::trim_chunk(const DoutPrefixProvider *dpp, 
+                                  const string& shard,
                                   const utime_t& from,
                                   const utime_t& to,
                                   const string& from_marker,
                                   const string& to_marker)
 {
-  ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
+  ldpp_dout(dpp, 20) << "trying to trim removal hints to=" << to
                           << ", to_marker=" << to_marker << dendl;
 
   real_time rt_from = from.to_real_time();
   real_time rt_to = to.to_real_time();
 
-  int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
-                                    from_marker, to_marker);
+  int ret = exp_store.objexp_hint_trim(dpp, shard, rt_from, rt_to,
+                                       from_marker, to_marker);
   if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
+    ldpp_dout(dpp, 0) << "ERROR during trim: " << ret << dendl;
   }
 
   return;
 }
 
-bool RGWObjectExpirer::process_single_shard(const string& shard,
+bool RGWObjectExpirer::process_single_shard(const DoutPrefixProvider *dpp, 
+                                            const string& shard,
                                             const utime_t& last_run,
                                             const utime_t& round_start)
 {
@@ -169,9 +305,9 @@ bool RGWObjectExpirer::process_single_shard(const string& shard,
   utime_t time(max_secs, 0);
   l.set_duration(time);
 
-  int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
+  int ret = l.lock_exclusive(&static_cast<rgw::sal::RadosStore*>(store)->getRados()->objexp_pool_ctx, shard);
   if (ret == -EBUSY) { /* already locked by another processor */
-    dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
+    ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
     return false;
   }
 
@@ -180,20 +316,20 @@ bool RGWObjectExpirer::process_single_shard(const string& shard,
     real_time rt_start = round_start.to_real_time();
 
     list<cls_timeindex_entry> entries;
-    ret = store->objexp_hint_list(shard, rt_last, rt_start,
-                                  num_entries, marker, entries,
-                                  &out_marker, &truncated);
+    ret = exp_store.objexp_hint_list(dpp, shard, rt_last, rt_start,
+                                     num_entries, marker, entries,
+                                     &out_marker, &truncated);
     if (ret < 0) {
-      ldout(cct, 10) << "cannot get removal hints from shard: " << shard
+      ldpp_dout(dpp, 10) << "cannot get removal hints from shard: " << shard
                      << dendl;
       continue;
     }
 
     bool need_trim;
-    garbage_chunk(entries, need_trim);
+    garbage_chunk(dpp, entries, need_trim);
 
     if (need_trim) {
-      trim_chunk(shard, last_run, round_start, marker, out_marker);
+      trim_chunk(dpp, shard, last_run, round_start, marker, out_marker);
     }
 
     utime_t now = ceph_clock_now();
@@ -205,12 +341,13 @@ bool RGWObjectExpirer::process_single_shard(const string& shard,
     marker = out_marker;
   } while (truncated);
 
-  l.unlock(&store->objexp_pool_ctx, shard);
+  l.unlock(&static_cast<rgw::sal::RadosStore*>(store)->getRados()->objexp_pool_ctx, shard);
   return done;
 }
 
 /* Returns true if all shards have been processed successfully. */
-bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
+bool RGWObjectExpirer::inspect_all_shards(const DoutPrefixProvider *dpp, 
+                                          const utime_t& last_run,
                                           const utime_t& round_start)
 {
   CephContext * const cct = store->ctx();
@@ -219,11 +356,11 @@ bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
 
   for (int i = 0; i < num_shards; i++) {
     string shard;
-    store->objexp_get_shard(i, shard);
+    objexp_get_shard(i, &shard);
 
-    ldout(store->ctx(), 20) << "processing shard = " << shard << dendl;
+    ldpp_dout(dpp, 20) << "processing shard = " << shard << dendl;
 
-    if (! process_single_shard(shard, last_run, round_start)) {
+    if (! process_single_shard(dpp, shard, last_run, round_start)) {
       all_done = false;
     }
   }
@@ -257,13 +394,13 @@ void *RGWObjectExpirer::OEWorker::entry() {
   utime_t last_run;
   do {
     utime_t start = ceph_clock_now();
-    ldout(cct, 2) << "object expiration: start" << dendl;
-    if (oe->inspect_all_shards(last_run, start)) {
+    ldpp_dout(this, 2) << "object expiration: start" << dendl;
+    if (oe->inspect_all_shards(this, last_run, start)) {
       /* All shards have been processed properly. Next time we can start
        * from this moment. */
       last_run = start;
     }
-    ldout(cct, 2) << "object expiration: stop" << dendl;
+    ldpp_dout(this, 2) << "object expiration: stop" << dendl;
 
 
     if (oe->going_down())
@@ -278,9 +415,8 @@ void *RGWObjectExpirer::OEWorker::entry() {
 
     secs -= end.sec();
 
-    lock.Lock();
-    cond.WaitInterval(lock, utime_t(secs, 0));
-    lock.Unlock();
+    std::unique_lock l{lock};
+    cond.wait_for(l, std::chrono::seconds(secs));
   } while (!oe->going_down());
 
   return NULL;
@@ -288,7 +424,21 @@ void *RGWObjectExpirer::OEWorker::entry() {
 
 void RGWObjectExpirer::OEWorker::stop()
 {
-  Mutex::Locker l(lock);
-  cond.Signal();
+  std::lock_guard l{lock};
+  cond.notify_all();
+}
+
+CephContext *RGWObjectExpirer::OEWorker::get_cct() const 
+{ 
+  return cct; 
+}
+
+unsigned RGWObjectExpirer::OEWorker::get_subsys() const 
+{
+    return dout_subsys;
 }
 
+std::ostream& RGWObjectExpirer::OEWorker::gen_prefix(std::ostream& out) const 
+{ 
+  return out << "rgw object expirer Worker thread: "; 
+}