]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_op.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_op.cc
index 72a4ec86fe28cdb889e6f6409486a4e8cddc3e45..98a6db4703f2caa1471757c95ff06ec00a5dc096 100644 (file)
@@ -76,41 +76,42 @@ static int forward_request_to_master(struct req_state *s, obj_version *objv, RGW
 
 static MultipartMetaFilter mp_filter;
 
-static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_content)
+int RGWGetObj::parse_range(void)
 {
   int r = -ERANGE;
-  string s(range);
+  string rs(range_str);
   string ofs_str;
   string end_str;
 
-  *partial_content = false;
+  ignore_invalid_range = s->cct->_conf->rgw_ignore_get_invalid_range;
+  partial_content = false;
 
-  size_t pos = s.find("bytes=");
+  size_t pos = rs.find("bytes=");
   if (pos == string::npos) {
     pos = 0;
-    while (isspace(s[pos]))
+    while (isspace(rs[pos]))
       pos++;
     int end = pos;
-    while (isalpha(s[end]))
+    while (isalpha(rs[end]))
       end++;
-    if (strncasecmp(s.c_str(), "bytes", end - pos) != 0)
+    if (strncasecmp(rs.c_str(), "bytes", end - pos) != 0)
       return 0;
-    while (isspace(s[end]))
+    while (isspace(rs[end]))
       end++;
-    if (s[end] != '=')
+    if (rs[end] != '=')
       return 0;
-    s = s.substr(end + 1);
+    rs = rs.substr(end + 1);
   } else {
-    s = s.substr(pos + 6); /* size of("bytes=")  */
+    rs = rs.substr(pos + 6); /* size of("bytes=")  */
   }
-  pos = s.find('-');
+  pos = rs.find('-');
   if (pos == string::npos)
     goto done;
 
-  *partial_content = true;
+  partial_content = true;
 
-  ofs_str = s.substr(0, pos);
-  end_str = s.substr(pos + 1);
+  ofs_str = rs.substr(0, pos);
+  end_str = rs.substr(pos + 1);
   if (end_str.length()) {
     end = atoll(end_str.c_str());
     if (end < 0)
@@ -127,8 +128,18 @@ static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_
   if (end >= 0 && end < ofs)
     goto done;
 
-  r = 0;
+  range_parsed = true;
+  return 0;
+
 done:
+  if (ignore_invalid_range) {
+    partial_content = false;
+    ofs = 0;
+    end = -1;
+    range_parsed = false; // allow retry
+    r = 0;
+  }
+
   return r;
 }
 
@@ -495,6 +506,8 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
        */
       if (store->get_zonegroup().is_master_zonegroup() && s->system_request) {
         /*If this is the master, don't redirect*/
+      } else if (s->op_type == RGW_OP_GET_BUCKET_LOCATION ) {
+        /* If op is get bucket location, don't redirect */
       } else if (!s->local_source ||
           (s->op != OP_PUT && s->op != OP_COPY) ||
           s->object.empty()) {
@@ -583,56 +596,53 @@ rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
   rgw::IAM::Environment e;
   const auto& m = s->info.env->get_map();
   auto t = ceph::real_clock::now();
-  e.emplace(std::piecewise_construct,
-           std::forward_as_tuple("aws:CurrentTime"),
-           std::forward_as_tuple(std::to_string(
-                                   ceph::real_clock::to_time_t(t))));
-  e.emplace(std::piecewise_construct,
-           std::forward_as_tuple("aws:EpochTime"),
-           std::forward_as_tuple(ceph::to_iso_8601(t)));
+  e.emplace("aws:CurrentTime", std::to_string(ceph::real_clock::to_time_t(t)));
+  e.emplace("aws:EpochTime", ceph::to_iso_8601(t));
   // TODO: This is fine for now, but once we have STS we'll need to
   // look and see. Also this won't work with the IdentityApplier
   // model, since we need to know the actual credential.
-  e.emplace(std::piecewise_construct,
-           std::forward_as_tuple("aws:PrincipalType"),
-           std::forward_as_tuple("User"));
+  e.emplace("aws:PrincipalType", "User");
 
   auto i = m.find("HTTP_REFERER");
   if (i != m.end()) {
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:Referer"),
-             std::forward_as_tuple(i->second));
+    e.emplace("aws:Referer", i->second);
   }
 
   // These seem to be the semantics, judging from rest_rgw_s3.cc
   i = m.find("SERVER_PORT_SECURE");
   if (i != m.end()) {
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:SecureTransport"),
-             std::forward_as_tuple("true"));
+    e.emplace("aws:SecureTransport", "true");
   }
 
-  i = m.find("HTTP_HOST");
+  const auto remote_addr_param = s->cct->_conf->rgw_remote_addr_param;
+  if (remote_addr_param.length()) {
+    i = m.find(remote_addr_param);
+  } else {
+    i = m.find("REMOTE_ADDR");
+  }
   if (i != m.end()) {
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:SourceIp"),
-             std::forward_as_tuple(i->second));
+    const string* ip = &(i->second);
+    string temp;
+    if (remote_addr_param == "HTTP_X_FORWARDED_FOR") {
+      const auto comma = ip->find(',');
+      if (comma != string::npos) {
+       temp.assign(*ip, 0, comma);
+       ip = &temp;
+      }
+    }
+    e.emplace("aws:SourceIp", *ip);
   }
 
   i = m.find("HTTP_USER_AGENT"); {
   if (i != m.end())
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:UserAgent"),
-             std::forward_as_tuple(i->second));
+    e.emplace("aws:UserAgent", i->second);
   }
 
   if (s->user) {
     // What to do about aws::userid? One can have multiple access
     // keys so that isn't really suitable. Do we have a durable
     // identifier that can persist through name changes?
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:username"),
-             std::forward_as_tuple(s->user->user_id.id));
+    e.emplace("aws:username", s->user->user_id.id);
   }
   return e;
 }
@@ -645,6 +655,37 @@ void rgw_bucket_object_pre_exec(struct req_state *s)
   dump_bucket_from_state(s);
 }
 
+// So! Now and then when we try to update bucket information, the
+// bucket has changed during the course of the operation. (Or we have
+// a cache consistency problem that Watch/Notify isn't ruling out
+// completely.)
+//
+// When this happens, we need to update the bucket info and try
+// again. We have, however, to try the right *part* again.  We can't
+// simply re-send, since that will obliterate the previous update.
+//
+// Thus, callers of this function should include everything that
+// merges information to be changed into the bucket information as
+// well as the call to set it.
+//
+// The called function must return an integer, negative on error. In
+// general, they should just return op_ret.
+namespace {
+template<typename F>
+int retry_raced_bucket_write(RGWRados* g, req_state* s, const F& f) {
+  auto r = f();
+  for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
+    r = g->try_refresh_bucket_info(s->bucket_info, nullptr,
+                                  &s->bucket_attrs);
+    if (r >= 0) {
+      r = f();
+    }
+  }
+  return r;
+}
+}
+
+
 int RGWGetObj::verify_permission()
 {
   obj = rgw_obj(s->bucket, s->object);
@@ -720,6 +761,12 @@ void RGWGetObjTags::execute()
   store->set_atomic(s->obj_ctx, obj);
 
   op_ret = get_obj_attrs(store, s, obj, attrs);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "ERROR: failed to get obj attrs, obj=" << obj
+                    << " ret=" << op_ret << dendl;
+    return;
+  }
+
   auto tags = attrs.find(RGW_ATTR_TAGS);
   if(tags != attrs.end()){
     has_tags = true;
@@ -879,6 +926,20 @@ static bool validate_cors_rule_method(RGWCORSRule *rule, const char *req_meth) {
   return true;
 }
 
+static bool validate_cors_rule_header(RGWCORSRule *rule, const char *req_hdrs) {
+  if (req_hdrs) {
+    vector<string> hdrs;
+    get_str_vec(req_hdrs, hdrs);
+    for (const auto& hdr : hdrs) {
+      if (!rule->is_header_allowed(hdr.c_str(), hdr.length())) {
+        dout(5) << "Header " << hdr << " is not registered in this rule" << dendl;
+        return false;
+      }
+    }
+  }
+  return true;
+}
+
 int RGWOp::read_bucket_cors()
 {
   bufferlist bl;
@@ -1538,16 +1599,13 @@ bool RGWGetObj::prefetch_data()
   bool prefetch_first_chunk = true;
   range_str = s->info.env->get("HTTP_RANGE");
 
-  if(range_str) {
-    int r = parse_range(range_str, ofs, end, &partial_content);
-    /* error on parsing the range, stop prefetch and will fail in execte() */
+  if (range_str) {
+    int r = parse_range();
+    /* error on parsing the range, stop prefetch and will fail in execute() */
     if (r < 0) {
-      range_parsed = false;
-      return false;
-    } else {
-      range_parsed = true;
+      return false; /* range_parsed==false */
     }
-    /* range get goes to shadown objects, stop prefetch */
+    /* range get goes to shadow objects, stop prefetch */
     if (ofs >= s->cct->_conf->rgw_max_chunk_size) {
       prefetch_first_chunk = false;
     }
@@ -1555,6 +1613,7 @@ bool RGWGetObj::prefetch_data()
 
   return get_data && prefetch_first_chunk;
 }
+
 void RGWGetObj::pre_exec()
 {
   rgw_bucket_object_pre_exec(s);
@@ -1632,8 +1691,15 @@ void RGWGetObj::execute()
   /* start gettorrent */
   if (torrent.get_flag())
   {
+    attr_iter = attrs.find(RGW_ATTR_CRYPT_MODE);
+    if (attr_iter != attrs.end() && attr_iter->second.to_str() == "SSE-C-AES256") {
+      ldout(s->cct, 0) << "ERROR: torrents are not supported for objects "
+          "encrypted with SSE-C" << dendl;
+      op_ret = -EINVAL;
+      goto done_err;
+    }
     torrent.init(s, store);
-    torrent.get_torrent_file(op_ret, read_op, total_len, bl, obj);
+    op_ret = torrent.get_torrent_file(read_op, total_len, bl, obj);
     if (op_ret < 0)
     {
       ldout(s->cct, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret
@@ -1755,9 +1821,9 @@ done_err:
 int RGWGetObj::init_common()
 {
   if (range_str) {
-    /* range parsed error when prefetch*/
+    /* range parsed error when prefetch */
     if (!range_parsed) {
-      int r = parse_range(range_str, ofs, end, &partial_content);
+      int r = parse_range();
       if (r < 0)
         return r;
     }
@@ -1801,7 +1867,7 @@ void RGWListBuckets::execute()
   bool started = false;
   uint64_t total_count = 0;
 
-  uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
+  const uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
 
   op_ret = get_params();
   if (op_ret < 0) {
@@ -1836,15 +1902,32 @@ void RGWListBuckets::execute()
                        << s->user->user_id << dendl;
       break;
     }
-    map<string, RGWBucketEnt>& m = buckets.get_buckets();
-    map<string, RGWBucketEnt>::iterator iter;
-    for (iter = m.begin(); iter != m.end(); ++iter) {
-      RGWBucketEnt& bucket = iter->second;
-      buckets_size += bucket.size;
-      buckets_size_rounded += bucket.size_rounded;
-      buckets_objcount += bucket.count;
+
+    /* We need to have stats for all our policies - even if a given policy
+     * isn't actually used in a given account. In such situation its usage
+     * stats would be simply full of zeros. */
+    for (const auto& policy : store->get_zonegroup().placement_targets) {
+      policies_stats.emplace(policy.second.name,
+                             decltype(policies_stats)::mapped_type());
     }
-    buckets_count += m.size();
+
+    std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+    for (const auto& kv : m) {
+      const auto& bucket = kv.second;
+
+      global_stats.bytes_used += bucket.size;
+      global_stats.bytes_used_rounded += bucket.size_rounded;
+      global_stats.objects_count += bucket.count;
+
+      /* operator[] still can create a new entry for storage policy seen
+       * for first time. */
+      auto& policy_stats = policies_stats[bucket.placement_rule];
+      policy_stats.bytes_used += bucket.size;
+      policy_stats.bytes_used_rounded += bucket.size_rounded;
+      policy_stats.buckets_count++;
+      policy_stats.objects_count += bucket.count;
+    }
+    global_stats.buckets_count += m.size();
     total_count += m.size();
 
     done = (m.size() < read_count || (limit >= 0 && total_count >= (uint64_t)limit));
@@ -1855,10 +1938,10 @@ void RGWListBuckets::execute()
     }
 
     if (!m.empty()) {
-      send_response_data(buckets);
-
       map<string, RGWBucketEnt>::reverse_iterator riter = m.rbegin();
       marker = riter->first;
+
+      handle_listing_chunk(std::move(buckets));
     }
   } while (is_truncated && !done);
 
@@ -1962,17 +2045,31 @@ void RGWStatAccount::execute()
                        << s->user->user_id << dendl;
       break;
     } else {
-      map<string, RGWBucketEnt>& m = buckets.get_buckets();
-      map<string, RGWBucketEnt>::iterator iter;
-      for (iter = m.begin(); iter != m.end(); ++iter) {
-        RGWBucketEnt& bucket = iter->second;
-        buckets_size += bucket.size;
-        buckets_size_rounded += bucket.size_rounded;
-        buckets_objcount += bucket.count;
-
-        marker = iter->first;
+      /* We need to have stats for all our policies - even if a given policy
+       * isn't actually used in a given account. In such situation its usage
+       * stats would be simply full of zeros. */
+      for (const auto& policy : store->get_zonegroup().placement_targets) {
+        policies_stats.emplace(policy.second.name,
+                               decltype(policies_stats)::mapped_type());
       }
-      buckets_count += m.size();
+
+      std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+      for (const auto& kv : m) {
+        const auto& bucket = kv.second;
+
+        global_stats.bytes_used += bucket.size;
+        global_stats.bytes_used_rounded += bucket.size_rounded;
+        global_stats.objects_count += bucket.count;
+
+        /* operator[] still can create a new entry for storage policy seen
+         * for first time. */
+        auto& policy_stats = policies_stats[bucket.placement_rule];
+        policy_stats.bytes_used += bucket.size;
+        policy_stats.bytes_used_rounded += bucket.size_rounded;
+        policy_stats.buckets_count++;
+        policy_stats.objects_count += bucket.count;
+      }
+      global_stats.buckets_count += m.size();
 
     }
   } while (is_truncated);
@@ -1980,11 +2077,7 @@ void RGWStatAccount::execute()
 
 int RGWGetBucketVersioning::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketVersioning);
 }
 
 void RGWGetBucketVersioning::pre_exec()
@@ -2000,11 +2093,7 @@ void RGWGetBucketVersioning::execute()
 
 int RGWSetBucketVersioning::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketVersioning);
 }
 
 void RGWSetBucketVersioning::pre_exec()
@@ -2026,15 +2115,18 @@ void RGWSetBucketVersioning::execute()
     }
   }
 
-  if (enable_versioning) {
-    s->bucket_info.flags |= BUCKET_VERSIONED;
-    s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
-  } else {
-    s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      if (enable_versioning) {
+       s->bucket_info.flags |= BUCKET_VERSIONED;
+       s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
+      } else {
+       s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
+      }
+
+      return store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                            &s->bucket_attrs);
+    });
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
-                                         &s->bucket_attrs);
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
                     << " returned err=" << op_ret << dendl;
@@ -2044,10 +2136,7 @@ void RGWSetBucketVersioning::execute()
 
 int RGWGetBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketWebsite);
 }
 
 void RGWGetBucketWebsite::pre_exec()
@@ -2058,16 +2147,13 @@ void RGWGetBucketWebsite::pre_exec()
 void RGWGetBucketWebsite::execute()
 {
   if (!s->bucket_info.has_website) {
-    op_ret = -ENOENT;
+    op_ret = -ERR_NO_SUCH_WEBSITE_CONFIGURATION;
   }
 }
 
 int RGWSetBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketWebsite);
 }
 
 void RGWSetBucketWebsite::pre_exec()
@@ -2090,10 +2176,14 @@ void RGWSetBucketWebsite::execute()
     }
   }
 
-  s->bucket_info.has_website = true;
-  s->bucket_info.website_conf = website_conf;
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      s->bucket_info.has_website = true;
+      s->bucket_info.website_conf = website_conf;
+      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+                                              real_time(), &s->bucket_attrs);
+      return op_ret;
+    });
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
     return;
@@ -2102,10 +2192,7 @@ void RGWSetBucketWebsite::execute()
 
 int RGWDeleteBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3DeleteBucketWebsite);
 }
 
 void RGWDeleteBucketWebsite::pre_exec()
@@ -2115,10 +2202,13 @@ void RGWDeleteBucketWebsite::pre_exec()
 
 void RGWDeleteBucketWebsite::execute()
 {
-  s->bucket_info.has_website = false;
-  s->bucket_info.website_conf = RGWBucketWebsiteConf();
-
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      s->bucket_info.has_website = false;
+      s->bucket_info.website_conf = RGWBucketWebsiteConf();
+      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+                                              real_time(), &s->bucket_attrs);
+      return op_ret;
+    });
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
     return;
@@ -2171,6 +2261,13 @@ int RGWListBucket::verify_permission()
   if (op_ret < 0) {
     return op_ret;
   }
+  if (!prefix.empty())
+    s->env.emplace("s3:prefix", prefix);
+
+  if (!delimiter.empty())
+    s->env.emplace("s3:delimiter", delimiter);
+
+  s->env.emplace("s3:max-keys", std::to_string(max));
 
   if (!verify_bucket_permission(s,
                                list_versions ?
@@ -2188,6 +2285,7 @@ int RGWListBucket::parse_max_keys()
     char *endptr;
     max = strtol(max_keys.c_str(), &endptr, 10);
     if (endptr) {
+      if (endptr == max_keys.c_str()) return -EINVAL;
       while (*endptr && isspace(*endptr)) // ignore white space
         endptr++;
       if (*endptr) {
@@ -2243,20 +2341,12 @@ void RGWListBucket::execute()
 
 int RGWGetBucketLogging::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketLogging);
 }
 
 int RGWGetBucketLocation::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketLocation);
 }
 
 int RGWCreateBucket::verify_permission()
@@ -2523,7 +2613,7 @@ void RGWCreateBucket::execute()
       return;
   }
 
-  if (!store->get_zonegroup().is_master_zonegroup() &&
+  if (!store->get_zonegroup().is_master_zonegroup() && !location_constraint.empty() &&
       store->get_zonegroup().api_name != location_constraint) {
     ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
                      << " doesn't match zonegroup" << " (" << store->get_zonegroup().api_name << ")"
@@ -2633,7 +2723,10 @@ void RGWCreateBucket::execute()
   if (need_metadata_upload()) {
     /* It's supposed that following functions WILL NOT change any special
      * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
-    rgw_get_request_metadata(s->cct, s->info, attrs, false);
+    op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+    if (op_ret < 0) {
+      return;
+    }
     prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
     populate_with_generic_attrs(s, attrs);
 
@@ -2726,7 +2819,10 @@ void RGWCreateBucket::execute()
 
       attrs.clear();
 
-      rgw_get_request_metadata(s->cct, s->info, attrs, false);
+      op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+      if (op_ret < 0) {
+        return;
+      }
       prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
       populate_with_generic_attrs(s, attrs);
       op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
@@ -2857,7 +2953,7 @@ void RGWDeleteBucket::execute()
   }
 
   if (op_ret == 0) {
-    op_ret = rgw_unlink_bucket(store, s->user->user_id, s->bucket.tenant,
+    op_ret = rgw_unlink_bucket(store, s->bucket_info.owner, s->bucket.tenant,
                               s->bucket.name, false);
     if (op_ret < 0) {
       ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
@@ -2874,7 +2970,7 @@ void RGWDeleteBucket::execute()
 
 int RGWPutObj::verify_permission()
 {
-  if (copy_source) {
+  if (! copy_source.empty()) {
 
     RGWAccessControlPolicy cs_acl(s->cct);
     optional<Policy> policy;
@@ -2887,8 +2983,8 @@ int RGWPutObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, obj);
 
     /* check source object permissions */
-    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl, policy,
-                       cs_bucket, cs_object) < 0) {
+    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl,
+                       policy, cs_bucket, cs_object) < 0) {
       return -EACCES;
     }
 
@@ -3011,6 +3107,7 @@ int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
   head_obj_op.meta.owner = s->owner.get_id();
   head_obj_op.meta.delete_at = delete_at;
   head_obj_op.meta.zones_trace = zones_trace;
+  head_obj_op.meta.modify_tail = true;
 
   int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
   if (r < 0)
@@ -3321,8 +3418,27 @@ void RGWPutObj::execute()
     goto done;
   }
 
+  if ((! copy_source.empty()) && !copy_source_range) {
+    rgw_obj_key obj_key(copy_source_object_name, copy_source_version_id);
+    rgw_obj obj(copy_source_bucket_info.bucket, obj_key.name);
+
+    RGWObjState *astate;
+    op_ret = store->get_obj_state(static_cast<RGWObjectCtx *>(s->obj_ctx),
+                                  copy_source_bucket_info, obj, &astate, true, false);
+    if (op_ret < 0) {
+      ldout(s->cct, 0) << "ERROR: get copy source obj state returned with error" << op_ret << dendl;
+      goto done;
+    }
+    if (!astate->exists){
+      op_ret = -ENOENT;
+      goto done;
+    }
+    lst = astate->accounted_size - 1;
+  } else {
+    lst = copy_source_range_lst;
+  }
+
   fst = copy_source_range_fst;
-  lst = copy_source_range_lst;
 
   op_ret = get_encrypt_filter(&encrypt, filter);
   if (op_ret < 0) {
@@ -3348,7 +3464,7 @@ void RGWPutObj::execute()
     bufferlist data;
     if (fst > lst)
       break;
-    if (!copy_source) {
+    if (copy_source.empty()) {
       len = get_data(data);
     } else {
       uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
@@ -3524,7 +3640,10 @@ void RGWPutObj::execute()
   emplace_attr(RGW_ATTR_ETAG, std::move(bl));
 
   populate_with_generic_attrs(s, attrs);
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    goto done;
+  }
   encode_delete_at_attr(delete_at, attrs);
   encode_obj_tags_attr(obj_tags.get(), attrs);
 
@@ -3838,7 +3957,10 @@ int RGWPutMetadataAccount::init_processing()
     attrs.emplace(RGW_ATTR_ACL, std::move(acl_bl));
   }
 
-  rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  if (op_ret < 0) {
+    return op_ret;
+  }
   prepare_add_del_attrs(orig_attrs, rmattr_names, attrs);
   populate_with_generic_attrs(s, attrs);
 
@@ -3930,7 +4052,10 @@ void RGWPutMetadataBucket::execute()
     return;
   }
 
-  rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  if (op_ret < 0) {
+    return;
+  }
 
   if (!placement_rule.empty() &&
       placement_rule != s->bucket_info.placement_rule) {
@@ -3938,55 +4063,61 @@ void RGWPutMetadataBucket::execute()
     return;
   }
 
-  /* Encode special metadata first as we're using std::map::emplace under
-   * the hood. This method will add the new items only if the map doesn't
-   * contain such keys yet. */
-  if (has_policy) {
-    if (s->dialect.compare("swift") == 0) {
-       auto old_policy = \
-          static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
-       auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
-       new_policy->filter_merge(policy_rw_mask, old_policy);
-       policy = *new_policy;
-    }
-    buffer::list bl;
-    policy.encode(bl);
-    emplace_attr(RGW_ATTR_ACL, std::move(bl));
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      /* Encode special metadata first as we're using std::map::emplace under
+       * the hood. This method will add the new items only if the map doesn't
+       * contain such keys yet. */
+      if (has_policy) {
+       if (s->dialect.compare("swift") == 0) {
+         auto old_policy =                                             \
+           static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
+         auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
+         new_policy->filter_merge(policy_rw_mask, old_policy);
+         policy = *new_policy;
+       }
+       buffer::list bl;
+       policy.encode(bl);
+       emplace_attr(RGW_ATTR_ACL, std::move(bl));
+      }
 
-  if (has_cors) {
-    buffer::list bl;
-    cors_config.encode(bl);
-    emplace_attr(RGW_ATTR_CORS, std::move(bl));
-  }
+      if (has_cors) {
+       buffer::list bl;
+       cors_config.encode(bl);
+       emplace_attr(RGW_ATTR_CORS, std::move(bl));
+      }
 
-  /* It's supposed that following functions WILL NOT change any special
-   * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
-  prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
-  populate_with_generic_attrs(s, attrs);
+      /* It's supposed that following functions WILL NOT change any
+       * special attributes (like RGW_ATTR_ACL) if they are already
+       * present in attrs. */
+      prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
+      populate_with_generic_attrs(s, attrs);
 
-  /* According  to the Swift's behaviour and its container_quota WSGI middleware
-   * implementation: anyone with write permissions is able to set the bucket
-   * quota. This stays in contrast to account quotas that can be set only by
-   * clients holding reseller admin privileges. */
-  op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
-  if (op_ret < 0) {
-    return;
-  }
+      /* According to the Swift's behaviour and its container_quota
+       * WSGI middleware implementation: anyone with write permissions
+       * is able to set the bucket quota. This stays in contrast to
+       * account quotas that can be set only by clients holding
+       * reseller admin privileges. */
+      op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
+      if (op_ret < 0) {
+       return op_ret;
+      }
 
-  if (swift_ver_location) {
-    s->bucket_info.swift_ver_location = *swift_ver_location;
-    s->bucket_info.swift_versioning = (! swift_ver_location->empty());
-  }
+      if (swift_ver_location) {
+       s->bucket_info.swift_ver_location = *swift_ver_location;
+       s->bucket_info.swift_versioning = (!swift_ver_location->empty());
+      }
 
-  /* Web site of Swift API. */
-  filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
-  s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
+      /* Web site of Swift API. */
+      filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
+      s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
 
-  /* Setting attributes also stores the provided bucket info. Due to this
-   * fact, the new quota settings can be serialized with the same call. */
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
+      /* Setting attributes also stores the provided bucket info. Due
+       * to this fact, the new quota settings can be serialized with
+       * the same call. */
+      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                   &s->bucket_info.objv_tracker);
+      return op_ret;
+    });
 }
 
 int RGWPutMetadataObject::verify_permission()
@@ -4017,7 +4148,11 @@ void RGWPutMetadataObject::execute()
     return;
   }
 
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    return;
+  }
+
   /* check if obj exists, read orig attrs */
   op_ret = get_obj_attrs(store, s, obj, orig_attrs);
   if (op_ret < 0) {
@@ -4205,6 +4340,9 @@ void RGWDeleteObj::execute()
       }
     }
 
+    if (op_ret == -ECANCELED) {
+      op_ret = 0;
+    }
     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
       op_ret = 0;
     }
@@ -4213,11 +4351,12 @@ void RGWDeleteObj::execute()
   }
 }
 
-
-bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name, rgw_obj_key& key)
+bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
+                                    string& bucket_name,
+                                    rgw_obj_key& key)
 {
-  string name_str;
-  string params_str;
+  boost::string_view name_str;
+  boost::string_view params_str;
 
   size_t pos = url_src.find('?');
   if (pos == string::npos) {
@@ -4227,27 +4366,27 @@ bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name,
     params_str = url_src.substr(pos + 1);
   }
 
-  std::string dec_src = url_decode(name_str);
-  const char *src = dec_src.c_str();
-
-  if (*src == '/') ++src;
-
-  string str(src);
+  boost::string_view dec_src{name_str};
+  if (dec_src[0] == '/')
+    dec_src.remove_prefix(1);
 
-  pos = str.find('/');
+  pos = dec_src.find('/');
   if (pos ==string::npos)
     return false;
 
-  bucket_name = str.substr(0, pos);
-  key.name = str.substr(pos + 1);
+  boost::string_view bn_view{dec_src.substr(0, pos)};
+  bucket_name = std::string{bn_view.data(), bn_view.size()};
+
+  boost::string_view kn_view{dec_src.substr(pos + 1)};
+  key.name = std::string{kn_view.data(), kn_view.size()};
 
   if (key.name.empty()) {
     return false;
   }
 
-  if (!params_str.empty()) {
+  if (! params_str.empty()) {
     RGWHTTPArgs args;
-    args.set(params_str);
+    args.set(params_str.to_string());
     args.parse();
 
     key.instance = args.get("versionId", NULL);
@@ -4391,7 +4530,10 @@ int RGWCopyObj::init_common()
   dest_policy.encode(aclbl);
   emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
 
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    return op_ret;
+  }
   populate_with_generic_attrs(s, attrs);
 
   return 0;
@@ -4485,7 +4627,7 @@ int RGWGetACLs::verify_permission()
                                    rgw::IAM::s3GetObjectAcl :
                                    rgw::IAM::s3GetObjectVersionAcl);
   } else {
-    perm = verify_bucket_permission(s, rgw::IAM::s3GetObjectAcl);
+    perm = verify_bucket_permission(s, rgw::IAM::s3GetBucketAcl);
   }
   if (!perm)
     return -EACCES;
@@ -4732,6 +4874,25 @@ void RGWPutLC::execute()
   RGWLCXMLParser_S3 parser(s->cct);
   RGWLifecycleConfiguration_S3 new_config(s->cct);
 
+  content_md5 = s->info.env->get("HTTP_CONTENT_MD5");
+  if (content_md5 == nullptr) {
+    op_ret = -ERR_INVALID_REQUEST;
+    s->err.message = "Missing required header for this request: Content-MD5";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    return;
+  }
+
+  std::string content_md5_bin;
+  try {
+    content_md5_bin = rgw::from_base64(boost::string_view(content_md5));
+  } catch (...) {
+    s->err.message = "Request header Content-MD5 contains character "
+                     "that is not base64 encoded.";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    op_ret = -ERR_BAD_DIGEST;
+    return;
+  }
+
   if (!parser.init()) {
     op_ret = -EINVAL;
     return;
@@ -4743,6 +4904,21 @@ void RGWPutLC::execute()
 
   ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
 
+  MD5 data_hash;
+  unsigned char data_hash_res[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  data_hash.Update(reinterpret_cast<const byte*>(data), len);
+  data_hash.Final(data_hash_res);
+
+  if (memcmp(data_hash_res, content_md5_bin.c_str(), CEPH_CRYPTO_MD5_DIGESTSIZE) != 0) {
+    op_ret = -ERR_BAD_DIGEST;
+    s->err.message = "The Content-MD5 you specified did not match what we received.";
+    ldout(s->cct, 5) << s->err.message
+                     << " Specified content md5: " << content_md5
+                     << ", calculated content md5: " << data_hash_res
+                     << dendl;
+    return;
+  }
+
   if (!parser.parse(data, len, 1)) {
     op_ret = -ERR_MALFORMED_XML;
     return;
@@ -4829,7 +5005,7 @@ void RGWDeleteLC::execute()
       }
     }
   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
-  string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+  string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;
   pair<string, int> entry(shard_id, lc_uninitial);
   string oid; 
   get_lc_oid(s, oid);
@@ -4861,11 +5037,7 @@ void RGWDeleteLC::execute()
 
 int RGWGetCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketCORS);
 }
 
 void RGWGetCORS::execute()
@@ -4883,11 +5055,7 @@ void RGWGetCORS::execute()
 
 int RGWPutCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketCORS);
 }
 
 void RGWPutCORS::execute()
@@ -4906,18 +5074,17 @@ void RGWPutCORS::execute()
     }
   }
 
-  map<string, bufferlist> attrs = s->bucket_attrs;
-  attrs[RGW_ATTR_CORS] = cors_bl;
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      map<string, bufferlist> attrs = s->bucket_attrs;
+      attrs[RGW_ATTR_CORS] = cors_bl;
+      return rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+    });
 }
 
 int RGWDeleteCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  // No separate delete permission
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketCORS);
 }
 
 void RGWDeleteCORS::execute()
@@ -4927,32 +5094,35 @@ void RGWDeleteCORS::execute()
     return;
 
   bufferlist bl;
-  rgw_raw_obj obj;
   if (!cors_exist) {
     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
     op_ret = -ENOENT;
     return;
   }
-  store->get_bucket_instance_obj(s->bucket, obj);
-  store->set_prefetch_data(s->obj_ctx, obj);
-  map<string, bufferlist> orig_attrs, attrs, rmattrs;
-  map<string, bufferlist>::iterator iter;
-
-  op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
-  if (op_ret < 0)
-    return;
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      rgw_raw_obj obj;
+      store->get_bucket_instance_obj(s->bucket, obj);
+      store->set_prefetch_data(s->obj_ctx, obj);
+      map<string, bufferlist> orig_attrs, attrs, rmattrs;
+      map<string, bufferlist>::iterator iter;
 
-  /* only remove meta attrs */
-  for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
-    const string& name = iter->first;
-    dout(10) << "DeleteCORS : attr: " << name << dendl;
-    if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
-      rmattrs[name] = iter->second;
-    } else if (attrs.find(name) == attrs.end()) {
-      attrs[name] = iter->second;
-    }
-  }
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+      op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
+      if (op_ret < 0)
+       return op_ret;
+
+      /* only remove meta attrs */
+      for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
+       const string& name = iter->first;
+       dout(10) << "DeleteCORS : attr: " << name << dendl;
+       if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
+         rmattrs[name] = iter->second;
+       } else if (attrs.find(name) == attrs.end()) {
+         attrs[name] = iter->second;
+       }
+      }
+      return rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                 &s->bucket_info.objv_tracker);
+    });
 }
 
 void RGWOptionsCORS::get_response_params(string& hdrs, string& exp_hdrs, unsigned *max_age) {
@@ -4969,6 +5139,11 @@ int RGWOptionsCORS::validate_cors_request(RGWCORSConfiguration *cc) {
   if (!validate_cors_rule_method(rule, req_meth)) {
     return -ENOENT;
   }
+
+  if (!validate_cors_rule_header(rule, req_hdrs)) {
+    return -ENOENT;
+  }
+
   return 0;
 }
 
@@ -5010,7 +5185,7 @@ void RGWOptionsCORS::execute()
 
 int RGWGetRequestPayment::verify_permission()
 {
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketRequestPayment);
 }
 
 void RGWGetRequestPayment::pre_exec()
@@ -5025,11 +5200,7 @@ void RGWGetRequestPayment::execute()
 
 int RGWSetRequestPayment::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketRequestPayment);
 }
 
 void RGWSetRequestPayment::pre_exec()
@@ -5101,7 +5272,10 @@ void RGWInitMultipart::execute()
   if (op_ret != 0)
     return;
 
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    return;
+  }
 
   do {
     char buf[33];
@@ -5276,6 +5450,27 @@ void RGWCompleteMultipart::execute()
   meta_obj.set_in_extra_data(true);
   meta_obj.index_hash_source = s->object.name;
 
+  /*take a cls lock on meta_obj to prevent racing completions (or retries)
+    from deleting the parts*/
+  rgw_pool meta_pool;
+  rgw_raw_obj raw_obj;
+  int max_lock_secs_mp =
+    s->cct->_conf->get_val<int64_t>("rgw_mp_lock_max_time");
+  utime_t dur(max_lock_secs_mp, 0);
+
+  store->obj_to_raw((s->bucket_info).placement_rule, meta_obj, &raw_obj);
+  store->get_obj_data_pool((s->bucket_info).placement_rule,
+                          meta_obj,&meta_pool);
+  store->open_pool_ctx(meta_pool, serializer.ioctx);
+
+  op_ret = serializer.try_lock(raw_obj.oid, dur);
+  if (op_ret < 0) {
+    dout(0) << "RGWCompleteMultipart::execute() failed to acquire lock " << dendl;
+    op_ret = -ERR_INTERNAL_ERROR;
+    s->err.message = "This multipart completion is already in progress";
+    return;
+  }
+
   op_ret = get_obj_attrs(store, s, meta_obj, attrs);
 
   if (op_ret < 0) {
@@ -5417,6 +5612,8 @@ void RGWCompleteMultipart::execute()
   obj_op.meta.ptag = &s->req_id; /* use req_id as operation tag */
   obj_op.meta.owner = s->owner.get_id();
   obj_op.meta.flags = PUT_OBJ_CREATE;
+  obj_op.meta.modify_tail = true;
+  obj_op.meta.completeMultipart = true;
   op_ret = obj_op.write_meta(ofs, accounted_size, attrs);
   if (op_ret < 0)
     return;
@@ -5424,9 +5621,41 @@ void RGWCompleteMultipart::execute()
   // remove the upload obj
   int r = store->delete_obj(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                            s->bucket_info, meta_obj, 0);
-  if (r < 0) {
-    ldout(store->ctx(), 0) << "WARNING: failed to remove object " << meta_obj << dendl;
+  if (r >= 0)  {
+    /* serializer's exclusive lock is released */
+    serializer.clear_locked();
+  } else {
+      ldout(store->ctx(), 0) << "WARNING: failed to remove object "
+                            << meta_obj << dendl;
+  }
+}
+
+int RGWCompleteMultipart::MPSerializer::try_lock(
+  const std::string& _oid,
+  utime_t dur)
+{
+  oid = _oid;
+  op.assert_exists();
+  lock.set_duration(dur);
+  lock.lock_exclusive(&op);
+  int ret = ioctx.operate(oid, &op);
+  if (! ret) {
+    locked = true;
+  }
+  return ret;
+}
+
+void RGWCompleteMultipart::complete()
+{
+  /* release exclusive lock iff not already */
+  if (unlikely(serializer.locked)) {
+    int r = serializer.unlock();
+    if (r < 0) {
+      ldout(store->ctx(), 0) << "WARNING: failed to unlock "
+                            << serializer.oid << dendl;
+    }
   }
+  send_response();
 }
 
 int RGWAbortMultipart::verify_permission()
@@ -5515,7 +5744,7 @@ void RGWListMultipart::execute()
 int RGWListBucketMultiparts::verify_permission()
 {
   if (!verify_bucket_permission(s,
-                               rgw::IAM::s3ListBucketMultiPartUploads))
+                               rgw::IAM::s3ListBucketMultipartUploads))
     return -EACCES;
 
   return 0;
@@ -6702,15 +6931,15 @@ void RGWPutBucketPolicy::execute()
   }
 
   try {
-    Policy p(s->cct, s->bucket_tenant, in_data);
-    auto attrs = s->bucket_attrs;
-    attrs[RGW_ATTR_IAM_POLICY].clear();
-    attrs[RGW_ATTR_IAM_POLICY].append(p.text);
-    op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                                 &s->bucket_info.objv_tracker);
-    if (op_ret == -ECANCELED) {
-      op_ret = 0; /* lost a race, but it's ok because policies are immutable */
-    }
+    const Policy p(s->cct, s->bucket_tenant, in_data);
+    op_ret = retry_raced_bucket_write(store, s, [&p, this] {
+       auto attrs = s->bucket_attrs;
+       attrs[RGW_ATTR_IAM_POLICY].clear();
+       attrs[RGW_ATTR_IAM_POLICY].append(p.text);
+       op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                     &s->bucket_info.objv_tracker);
+       return op_ret;
+      });
   } catch (rgw::IAM::PolicyParseException& e) {
     ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
     op_ret = -EINVAL;
@@ -6778,11 +7007,18 @@ int RGWDeleteBucketPolicy::verify_permission()
 
 void RGWDeleteBucketPolicy::execute()
 {
-  auto attrs = s->bucket_attrs;
-  attrs.erase(RGW_ATTR_IAM_POLICY);
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
-  if (op_ret == -ECANCELED) {
-    op_ret = 0; /* lost a race, but it's ok because policies are immutable */
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      auto attrs = s->bucket_attrs;
+      attrs.erase(RGW_ATTR_IAM_POLICY);
+      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                   &s->bucket_info.objv_tracker);
+      return op_ret;
+    });
 }
+
+void RGWGetClusterStat::execute()
+{
+  op_ret = this->store->get_rados_handle()->cluster_stat(stats_op);
+}
+
+