]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rados.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_rados.cc
index 70246ced0fa7ae5db8caf2c555b44f1e2797c662..19a83da62f12c1137676086bffbea68d7a5b5ac1 100644 (file)
@@ -1585,6 +1585,9 @@ int RGWRados::init_complete()
   data_notifier = new RGWDataNotifier(this);
   data_notifier->start();
 
+  binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
+  binfo_cache->init(svc.cache);
+
   lc = new RGWLC();
   lc->initialize(cct, this);
 
@@ -1602,9 +1605,6 @@ int RGWRados::init_complete()
   }
   ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
 
-  binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
-  binfo_cache->init(svc.cache);
-
   bool need_tombstone_cache = !svc.zone->get_zone_data_notify_to_map().empty(); /* have zones syncing from us */
 
   if (need_tombstone_cache) {
@@ -4839,9 +4839,10 @@ done_ret:
     vector<rgw_raw_obj>::iterator riter;
 
     /* rollback reference */
+    string ref_tag = tag + '\0';
     for (riter = ref_objs.begin(); riter != ref_objs.end(); ++riter) {
       ObjectWriteOperation op;
-      cls_refcount_put(op, tag, true);
+      cls_refcount_put(op, ref_tag, true);
 
       ref.ioctx.locator_set_key(riter->loc);
 
@@ -6287,9 +6288,15 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& sr
         state->attrset.erase(iter->first);
       }
     }
+
     for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
       state->attrset[iter->first] = iter->second;
     }
+
+    auto iter = state->attrset.find(RGW_ATTR_ID_TAG);
+    if (iter != state->attrset.end()) {
+      iter->second = state->obj_tag;
+    }
   }
 
   return 0;
@@ -7435,6 +7442,18 @@ int RGWRados::bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjSta
   return 0;
 }
 
+static int decode_olh_info(CephContext* cct, const bufferlist& bl, RGWOLHInfo *olh)
+{
+  try {
+    auto biter = bl.cbegin();
+    decode(*olh, biter);
+    return 0;
+  } catch (buffer::error& err) {
+    ldout(cct, 0) << "ERROR: failed to decode olh info" << dendl;
+    return -EIO;
+  }
+}
+
 int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
                             bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
                             uint64_t *plast_ver, rgw_zone_set* zones_trace)
@@ -7451,7 +7470,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
   map<uint64_t, vector<rgw_bucket_olh_log_entry> >::iterator iter = log.begin();
 
   op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
-  op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
+  op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GTE, last_ver);
 
   bufferlist ver_bl;
   string last_ver_s = to_string(last_ver);
@@ -7462,17 +7481,36 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
   op.mtime2(&mtime_ts);
 
   bool need_to_link = false;
+  uint64_t link_epoch = 0;
   cls_rgw_obj_key key;
   bool delete_marker = false;
   list<cls_rgw_obj_key> remove_instances;
   bool need_to_remove = false;
 
+  // decode current epoch and instance
+  auto olh_ver = state.attrset.find(RGW_ATTR_OLH_VER);
+  if (olh_ver != state.attrset.end()) {
+    std::string str = olh_ver->second.to_str();
+    std::string err;
+    link_epoch = strict_strtoll(str.c_str(), 10, &err);
+  }
+  auto olh_info = state.attrset.find(RGW_ATTR_OLH_INFO);
+  if (olh_info != state.attrset.end()) {
+    RGWOLHInfo info;
+    int r = decode_olh_info(cct, olh_info->second, &info);
+    if (r < 0) {
+      return r;
+    }
+    info.target.key.get_index_key(&key);
+    delete_marker = info.removed;
+  }
+
   for (iter = log.begin(); iter != log.end(); ++iter) {
     vector<rgw_bucket_olh_log_entry>::iterator viter = iter->second.begin();
     for (; viter != iter->second.end(); ++viter) {
       rgw_bucket_olh_log_entry& entry = *viter;
 
-      ldout(cct, 20) << "olh_log_entry: op=" << (int)entry.op
+      ldout(cct, 20) << "olh_log_entry: epoch=" << iter->first << " op=" << (int)entry.op
                      << " key=" << entry.key.name << "[" << entry.key.instance << "] "
                      << (entry.delete_marker ? "(delete)" : "") << dendl;
       switch (entry.op) {
@@ -7480,10 +7518,19 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGW
         remove_instances.push_back(entry.key);
         break;
       case CLS_RGW_OLH_OP_LINK_OLH:
-        need_to_link = true;
-        need_to_remove = false;
-        key = entry.key;
-        delete_marker = entry.delete_marker;
+        // only overwrite a link of the same epoch if its key sorts before
+        if (link_epoch < iter->first || key.instance.empty() ||
+            key.instance > entry.key.instance) {
+          ldout(cct, 20) << "apply_olh_log applying key=" << entry.key << " epoch=" << iter->first << " delete_marker=" << entry.delete_marker
+              << " over current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl;
+          need_to_link = true;
+          need_to_remove = false;
+          key = entry.key;
+          delete_marker = entry.delete_marker;
+        } else {
+          ldout(cct, 20) << "apply_olh skipping key=" << entry.key<< " epoch=" << iter->first << " delete_marker=" << entry.delete_marker
+              << " before current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl;
+        }
         break;
       case CLS_RGW_OLH_OP_UNLINK_OLH:
         need_to_remove = true;
@@ -7742,35 +7789,22 @@ void RGWRados::gen_rand_obj_instance_name(rgw_obj *target_obj)
 
 int RGWRados::get_olh(const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh)
 {
-  map<string, bufferlist> unfiltered_attrset;
+  map<string, bufferlist> attrset;
 
   ObjectReadOperation op;
-  op.getxattrs(&unfiltered_attrset, NULL);
+  op.getxattrs(&attrset, NULL);
 
-  bufferlist outbl;
   int r = obj_operate(bucket_info, obj, &op);
-
   if (r < 0) {
     return r;
   }
-  map<string, bufferlist> attrset;
-
-  rgw_filter_attrset(unfiltered_attrset, RGW_ATTR_OLH_PREFIX, &attrset);
 
-  map<string, bufferlist>::iterator iter = attrset.find(RGW_ATTR_OLH_INFO);
+  auto iter = attrset.find(RGW_ATTR_OLH_INFO);
   if (iter == attrset.end()) { /* not an olh */
     return -EINVAL;
   }
 
-  try {
-    auto biter = iter->second.cbegin();
-    decode(*olh, biter);
-  } catch (buffer::error& err) {
-    ldout(cct, 0) << "ERROR: failed to decode olh info" << dendl;
-    return -EIO;
-  }
-
-  return 0;
+  return decode_olh_info(cct, iter->second, olh);
 }
 
 void RGWRados::check_pending_olh_entries(map<string, bufferlist>& pending_entries, 
@@ -7861,15 +7895,15 @@ int RGWRados::follow_olh(const RGWBucketInfo& bucket_info, RGWObjectCtx& obj_ctx
     }
   }
 
-  map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_OLH_INFO);
-  ceph_assert(iter != state->attrset.end());
+  auto iter = state->attrset.find(RGW_ATTR_OLH_INFO);
+  if (iter == state->attrset.end()) {
+    return -EINVAL;
+  }
+
   RGWOLHInfo olh;
-  try {
-    auto biter = iter->second.cbegin();
-    decode(olh, biter);
-  } catch (buffer::error& err) {
-    ldout(cct, 0) << "ERROR: failed to decode olh info" << dendl;
-    return -EIO;
+  int ret = decode_olh_info(cct, iter->second, &olh);
+  if (ret < 0) {
+    return ret;
   }
 
   if (olh.removed) {
@@ -9138,10 +9172,11 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
 
   map<string, bufferlist> updates;
   uint32_t count = 0;
+  int pos = -1;
   while (count < num_entries && !candidates.empty()) {
     r = 0;
     // Select the next one
-    int pos = candidates.begin()->second;
+    pos = candidates.begin()->second;
     const string& name = vcurrents[pos]->first;
     struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
 
@@ -9197,8 +9232,9 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
       break;
     }
   }
-  if (!m.empty())
-    *last_entry = m.rbegin()->first;
+
+  if (pos >= 0)
+    *last_entry = std::move((--vcurrents[pos])->first);
 
   return 0;
 }