]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_op.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_op.cc
index 11fdbd4ebc3fdb95863baf710c0cd52da11a9574..98a6db4703f2caa1471757c95ff06ec00a5dc096 100644 (file)
@@ -76,41 +76,42 @@ static int forward_request_to_master(struct req_state *s, obj_version *objv, RGW
 
 static MultipartMetaFilter mp_filter;
 
-static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_content)
+int RGWGetObj::parse_range(void)
 {
   int r = -ERANGE;
-  string s(range);
+  string rs(range_str);
   string ofs_str;
   string end_str;
 
-  *partial_content = false;
+  ignore_invalid_range = s->cct->_conf->rgw_ignore_get_invalid_range;
+  partial_content = false;
 
-  size_t pos = s.find("bytes=");
+  size_t pos = rs.find("bytes=");
   if (pos == string::npos) {
     pos = 0;
-    while (isspace(s[pos]))
+    while (isspace(rs[pos]))
       pos++;
     int end = pos;
-    while (isalpha(s[end]))
+    while (isalpha(rs[end]))
       end++;
-    if (strncasecmp(s.c_str(), "bytes", end - pos) != 0)
+    if (strncasecmp(rs.c_str(), "bytes", end - pos) != 0)
       return 0;
-    while (isspace(s[end]))
+    while (isspace(rs[end]))
       end++;
-    if (s[end] != '=')
+    if (rs[end] != '=')
       return 0;
-    s = s.substr(end + 1);
+    rs = rs.substr(end + 1);
   } else {
-    s = s.substr(pos + 6); /* size of("bytes=")  */
+    rs = rs.substr(pos + 6); /* size of("bytes=")  */
   }
-  pos = s.find('-');
+  pos = rs.find('-');
   if (pos == string::npos)
     goto done;
 
-  *partial_content = true;
+  partial_content = true;
 
-  ofs_str = s.substr(0, pos);
-  end_str = s.substr(pos + 1);
+  ofs_str = rs.substr(0, pos);
+  end_str = rs.substr(pos + 1);
   if (end_str.length()) {
     end = atoll(end_str.c_str());
     if (end < 0)
@@ -127,8 +128,18 @@ static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_
   if (end >= 0 && end < ofs)
     goto done;
 
-  r = 0;
+  range_parsed = true;
+  return 0;
+
 done:
+  if (ignore_invalid_range) {
+    partial_content = false;
+    ofs = 0;
+    end = -1;
+    range_parsed = false; // allow retry
+    r = 0;
+  }
+
   return r;
 }
 
@@ -585,56 +596,53 @@ rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
   rgw::IAM::Environment e;
   const auto& m = s->info.env->get_map();
   auto t = ceph::real_clock::now();
-  e.emplace(std::piecewise_construct,
-           std::forward_as_tuple("aws:CurrentTime"),
-           std::forward_as_tuple(std::to_string(
-                                   ceph::real_clock::to_time_t(t))));
-  e.emplace(std::piecewise_construct,
-           std::forward_as_tuple("aws:EpochTime"),
-           std::forward_as_tuple(ceph::to_iso_8601(t)));
+  e.emplace("aws:CurrentTime", std::to_string(ceph::real_clock::to_time_t(t)));
+  e.emplace("aws:EpochTime", ceph::to_iso_8601(t));
   // TODO: This is fine for now, but once we have STS we'll need to
   // look and see. Also this won't work with the IdentityApplier
   // model, since we need to know the actual credential.
-  e.emplace(std::piecewise_construct,
-           std::forward_as_tuple("aws:PrincipalType"),
-           std::forward_as_tuple("User"));
+  e.emplace("aws:PrincipalType", "User");
 
   auto i = m.find("HTTP_REFERER");
   if (i != m.end()) {
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:Referer"),
-             std::forward_as_tuple(i->second));
+    e.emplace("aws:Referer", i->second);
   }
 
   // These seem to be the semantics, judging from rest_rgw_s3.cc
   i = m.find("SERVER_PORT_SECURE");
   if (i != m.end()) {
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:SecureTransport"),
-             std::forward_as_tuple("true"));
+    e.emplace("aws:SecureTransport", "true");
   }
 
-  i = m.find("HTTP_HOST");
+  const auto remote_addr_param = s->cct->_conf->rgw_remote_addr_param;
+  if (remote_addr_param.length()) {
+    i = m.find(remote_addr_param);
+  } else {
+    i = m.find("REMOTE_ADDR");
+  }
   if (i != m.end()) {
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:SourceIp"),
-             std::forward_as_tuple(i->second));
+    const string* ip = &(i->second);
+    string temp;
+    if (remote_addr_param == "HTTP_X_FORWARDED_FOR") {
+      const auto comma = ip->find(',');
+      if (comma != string::npos) {
+       temp.assign(*ip, 0, comma);
+       ip = &temp;
+      }
+    }
+    e.emplace("aws:SourceIp", *ip);
   }
 
   i = m.find("HTTP_USER_AGENT"); {
   if (i != m.end())
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:UserAgent"),
-             std::forward_as_tuple(i->second));
+    e.emplace("aws:UserAgent", i->second);
   }
 
   if (s->user) {
     // What to do about aws::userid? One can have multiple access
     // keys so that isn't really suitable. Do we have a durable
     // identifier that can persist through name changes?
-    e.emplace(std::piecewise_construct,
-             std::forward_as_tuple("aws:username"),
-             std::forward_as_tuple(s->user->user_id.id));
+    e.emplace("aws:username", s->user->user_id.id);
   }
   return e;
 }
@@ -647,6 +655,37 @@ void rgw_bucket_object_pre_exec(struct req_state *s)
   dump_bucket_from_state(s);
 }
 
+// So! Now and then when we try to update bucket information, the
+// bucket has changed during the course of the operation. (Or we have
+// a cache consistency problem that Watch/Notify isn't ruling out
+// completely.)
+//
+// When this happens, we need to update the bucket info and try
+// again. We have, however, to try the right *part* again.  We can't
+// simply re-send, since that will obliterate the previous update.
+//
+// Thus, callers of this function should include everything that
+// merges information to be changed into the bucket information as
+// well as the call to set it.
+//
+// The called function must return an integer, negative on error. In
+// general, they should just return op_ret.
+namespace {
+template<typename F>
+int retry_raced_bucket_write(RGWRados* g, req_state* s, const F& f) {
+  auto r = f();
+  for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
+    r = g->try_refresh_bucket_info(s->bucket_info, nullptr,
+                                  &s->bucket_attrs);
+    if (r >= 0) {
+      r = f();
+    }
+  }
+  return r;
+}
+}
+
+
 int RGWGetObj::verify_permission()
 {
   obj = rgw_obj(s->bucket, s->object);
@@ -887,6 +926,20 @@ static bool validate_cors_rule_method(RGWCORSRule *rule, const char *req_meth) {
   return true;
 }
 
+static bool validate_cors_rule_header(RGWCORSRule *rule, const char *req_hdrs) {
+  if (req_hdrs) {
+    vector<string> hdrs;
+    get_str_vec(req_hdrs, hdrs);
+    for (const auto& hdr : hdrs) {
+      if (!rule->is_header_allowed(hdr.c_str(), hdr.length())) {
+        dout(5) << "Header " << hdr << " is not registered in this rule" << dendl;
+        return false;
+      }
+    }
+  }
+  return true;
+}
+
 int RGWOp::read_bucket_cors()
 {
   bufferlist bl;
@@ -1546,16 +1599,13 @@ bool RGWGetObj::prefetch_data()
   bool prefetch_first_chunk = true;
   range_str = s->info.env->get("HTTP_RANGE");
 
-  if(range_str) {
-    int r = parse_range(range_str, ofs, end, &partial_content);
-    /* error on parsing the range, stop prefetch and will fail in execte() */
+  if (range_str) {
+    int r = parse_range();
+    /* error on parsing the range, stop prefetch and will fail in execute() */
     if (r < 0) {
-      range_parsed = false;
-      return false;
-    } else {
-      range_parsed = true;
+      return false; /* range_parsed==false */
     }
-    /* range get goes to shadown objects, stop prefetch */
+    /* range get goes to shadow objects, stop prefetch */
     if (ofs >= s->cct->_conf->rgw_max_chunk_size) {
       prefetch_first_chunk = false;
     }
@@ -1563,6 +1613,7 @@ bool RGWGetObj::prefetch_data()
 
   return get_data && prefetch_first_chunk;
 }
+
 void RGWGetObj::pre_exec()
 {
   rgw_bucket_object_pre_exec(s);
@@ -1642,7 +1693,9 @@ void RGWGetObj::execute()
   {
     attr_iter = attrs.find(RGW_ATTR_CRYPT_MODE);
     if (attr_iter != attrs.end() && attr_iter->second.to_str() == "SSE-C-AES256") {
-      op_ret = -ERR_INVALID_REQUEST;
+      ldout(s->cct, 0) << "ERROR: torrents are not supported for objects "
+          "encrypted with SSE-C" << dendl;
+      op_ret = -EINVAL;
       goto done_err;
     }
     torrent.init(s, store);
@@ -1768,9 +1821,9 @@ done_err:
 int RGWGetObj::init_common()
 {
   if (range_str) {
-    /* range parsed error when prefetch*/
+    /* range parsed error when prefetch */
     if (!range_parsed) {
-      int r = parse_range(range_str, ofs, end, &partial_content);
+      int r = parse_range();
       if (r < 0)
         return r;
     }
@@ -2024,16 +2077,7 @@ void RGWStatAccount::execute()
 
 int RGWGetBucketVersioning::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3GetBucketVersioning,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketVersioning);
 }
 
 void RGWGetBucketVersioning::pre_exec()
@@ -2049,16 +2093,7 @@ void RGWGetBucketVersioning::execute()
 
 int RGWSetBucketVersioning::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3PutBucketVersioning,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketVersioning);
 }
 
 void RGWSetBucketVersioning::pre_exec()
@@ -2080,15 +2115,18 @@ void RGWSetBucketVersioning::execute()
     }
   }
 
-  if (enable_versioning) {
-    s->bucket_info.flags |= BUCKET_VERSIONED;
-    s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
-  } else {
-    s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      if (enable_versioning) {
+       s->bucket_info.flags |= BUCKET_VERSIONED;
+       s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
+      } else {
+       s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
+      }
+
+      return store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                            &s->bucket_attrs);
+    });
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
-                                         &s->bucket_attrs);
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
                     << " returned err=" << op_ret << dendl;
@@ -2098,17 +2136,7 @@ void RGWSetBucketVersioning::execute()
 
 int RGWGetBucketWebsite::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3GetBucketWebsite,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketWebsite);
 }
 
 void RGWGetBucketWebsite::pre_exec()
@@ -2119,23 +2147,13 @@ void RGWGetBucketWebsite::pre_exec()
 void RGWGetBucketWebsite::execute()
 {
   if (!s->bucket_info.has_website) {
-    op_ret = -ENOENT;
+    op_ret = -ERR_NO_SUCH_WEBSITE_CONFIGURATION;
   }
 }
 
 int RGWSetBucketWebsite::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3PutBucketWebsite,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketWebsite);
 }
 
 void RGWSetBucketWebsite::pre_exec()
@@ -2158,10 +2176,14 @@ void RGWSetBucketWebsite::execute()
     }
   }
 
-  s->bucket_info.has_website = true;
-  s->bucket_info.website_conf = website_conf;
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      s->bucket_info.has_website = true;
+      s->bucket_info.website_conf = website_conf;
+      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+                                              real_time(), &s->bucket_attrs);
+      return op_ret;
+    });
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
     return;
@@ -2170,10 +2192,7 @@ void RGWSetBucketWebsite::execute()
 
 int RGWDeleteBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3DeleteBucketWebsite);
 }
 
 void RGWDeleteBucketWebsite::pre_exec()
@@ -2183,10 +2202,13 @@ void RGWDeleteBucketWebsite::pre_exec()
 
 void RGWDeleteBucketWebsite::execute()
 {
-  s->bucket_info.has_website = false;
-  s->bucket_info.website_conf = RGWBucketWebsiteConf();
-
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      s->bucket_info.has_website = false;
+      s->bucket_info.website_conf = RGWBucketWebsiteConf();
+      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+                                              real_time(), &s->bucket_attrs);
+      return op_ret;
+    });
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
     return;
@@ -2239,6 +2261,13 @@ int RGWListBucket::verify_permission()
   if (op_ret < 0) {
     return op_ret;
   }
+  if (!prefix.empty())
+    s->env.emplace("s3:prefix", prefix);
+
+  if (!delimiter.empty())
+    s->env.emplace("s3:delimiter", delimiter);
+
+  s->env.emplace("s3:max-keys", std::to_string(max));
 
   if (!verify_bucket_permission(s,
                                list_versions ?
@@ -2256,6 +2285,7 @@ int RGWListBucket::parse_max_keys()
     char *endptr;
     max = strtol(max_keys.c_str(), &endptr, 10);
     if (endptr) {
+      if (endptr == max_keys.c_str()) return -EINVAL;
       while (*endptr && isspace(*endptr)) // ignore white space
         endptr++;
       if (*endptr) {
@@ -2311,25 +2341,12 @@ void RGWListBucket::execute()
 
 int RGWGetBucketLogging::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketLogging);
 }
 
 int RGWGetBucketLocation::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3GetBucketLocation,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketLocation);
 }
 
 int RGWCreateBucket::verify_permission()
@@ -2936,7 +2953,7 @@ void RGWDeleteBucket::execute()
   }
 
   if (op_ret == 0) {
-    op_ret = rgw_unlink_bucket(store, s->user->user_id, s->bucket.tenant,
+    op_ret = rgw_unlink_bucket(store, s->bucket_info.owner, s->bucket.tenant,
                               s->bucket.name, false);
     if (op_ret < 0) {
       ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
@@ -2953,7 +2970,7 @@ void RGWDeleteBucket::execute()
 
 int RGWPutObj::verify_permission()
 {
-  if (copy_source) {
+  if (! copy_source.empty()) {
 
     RGWAccessControlPolicy cs_acl(s->cct);
     optional<Policy> policy;
@@ -2966,8 +2983,8 @@ int RGWPutObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, obj);
 
     /* check source object permissions */
-    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl, policy,
-                       cs_bucket, cs_object) < 0) {
+    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl,
+                       policy, cs_bucket, cs_object) < 0) {
       return -EACCES;
     }
 
@@ -3401,8 +3418,27 @@ void RGWPutObj::execute()
     goto done;
   }
 
+  if ((! copy_source.empty()) && !copy_source_range) {
+    rgw_obj_key obj_key(copy_source_object_name, copy_source_version_id);
+    rgw_obj obj(copy_source_bucket_info.bucket, obj_key.name);
+
+    RGWObjState *astate;
+    op_ret = store->get_obj_state(static_cast<RGWObjectCtx *>(s->obj_ctx),
+                                  copy_source_bucket_info, obj, &astate, true, false);
+    if (op_ret < 0) {
+      ldout(s->cct, 0) << "ERROR: get copy source obj state returned with error" << op_ret << dendl;
+      goto done;
+    }
+    if (!astate->exists){
+      op_ret = -ENOENT;
+      goto done;
+    }
+    lst = astate->accounted_size - 1;
+  } else {
+    lst = copy_source_range_lst;
+  }
+
   fst = copy_source_range_fst;
-  lst = copy_source_range_lst;
 
   op_ret = get_encrypt_filter(&encrypt, filter);
   if (op_ret < 0) {
@@ -3428,7 +3464,7 @@ void RGWPutObj::execute()
     bufferlist data;
     if (fst > lst)
       break;
-    if (!copy_source) {
+    if (copy_source.empty()) {
       len = get_data(data);
     } else {
       uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
@@ -4027,55 +4063,61 @@ void RGWPutMetadataBucket::execute()
     return;
   }
 
-  /* Encode special metadata first as we're using std::map::emplace under
-   * the hood. This method will add the new items only if the map doesn't
-   * contain such keys yet. */
-  if (has_policy) {
-    if (s->dialect.compare("swift") == 0) {
-       auto old_policy = \
-          static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
-       auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
-       new_policy->filter_merge(policy_rw_mask, old_policy);
-       policy = *new_policy;
-    }
-    buffer::list bl;
-    policy.encode(bl);
-    emplace_attr(RGW_ATTR_ACL, std::move(bl));
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      /* Encode special metadata first as we're using std::map::emplace under
+       * the hood. This method will add the new items only if the map doesn't
+       * contain such keys yet. */
+      if (has_policy) {
+       if (s->dialect.compare("swift") == 0) {
+         auto old_policy =                                             \
+           static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
+         auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
+         new_policy->filter_merge(policy_rw_mask, old_policy);
+         policy = *new_policy;
+       }
+       buffer::list bl;
+       policy.encode(bl);
+       emplace_attr(RGW_ATTR_ACL, std::move(bl));
+      }
 
-  if (has_cors) {
-    buffer::list bl;
-    cors_config.encode(bl);
-    emplace_attr(RGW_ATTR_CORS, std::move(bl));
-  }
+      if (has_cors) {
+       buffer::list bl;
+       cors_config.encode(bl);
+       emplace_attr(RGW_ATTR_CORS, std::move(bl));
+      }
 
-  /* It's supposed that following functions WILL NOT change any special
-   * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
-  prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
-  populate_with_generic_attrs(s, attrs);
+      /* It's supposed that following functions WILL NOT change any
+       * special attributes (like RGW_ATTR_ACL) if they are already
+       * present in attrs. */
+      prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
+      populate_with_generic_attrs(s, attrs);
 
-  /* According  to the Swift's behaviour and its container_quota WSGI middleware
-   * implementation: anyone with write permissions is able to set the bucket
-   * quota. This stays in contrast to account quotas that can be set only by
-   * clients holding reseller admin privileges. */
-  op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
-  if (op_ret < 0) {
-    return;
-  }
+      /* According to the Swift's behaviour and its container_quota
+       * WSGI middleware implementation: anyone with write permissions
+       * is able to set the bucket quota. This stays in contrast to
+       * account quotas that can be set only by clients holding
+       * reseller admin privileges. */
+      op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
+      if (op_ret < 0) {
+       return op_ret;
+      }
 
-  if (swift_ver_location) {
-    s->bucket_info.swift_ver_location = *swift_ver_location;
-    s->bucket_info.swift_versioning = (! swift_ver_location->empty());
-  }
+      if (swift_ver_location) {
+       s->bucket_info.swift_ver_location = *swift_ver_location;
+       s->bucket_info.swift_versioning = (!swift_ver_location->empty());
+      }
 
-  /* Web site of Swift API. */
-  filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
-  s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
+      /* Web site of Swift API. */
+      filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
+      s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
 
-  /* Setting attributes also stores the provided bucket info. Due to this
-   * fact, the new quota settings can be serialized with the same call. */
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
+      /* Setting attributes also stores the provided bucket info. Due
+       * to this fact, the new quota settings can be serialized with
+       * the same call. */
+      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                   &s->bucket_info.objv_tracker);
+      return op_ret;
+    });
 }
 
 int RGWPutMetadataObject::verify_permission()
@@ -4298,6 +4340,9 @@ void RGWDeleteObj::execute()
       }
     }
 
+    if (op_ret == -ECANCELED) {
+      op_ret = 0;
+    }
     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
       op_ret = 0;
     }
@@ -4306,11 +4351,12 @@ void RGWDeleteObj::execute()
   }
 }
 
-
-bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name, rgw_obj_key& key)
+bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
+                                    string& bucket_name,
+                                    rgw_obj_key& key)
 {
-  string name_str;
-  string params_str;
+  boost::string_view name_str;
+  boost::string_view params_str;
 
   size_t pos = url_src.find('?');
   if (pos == string::npos) {
@@ -4320,27 +4366,27 @@ bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name,
     params_str = url_src.substr(pos + 1);
   }
 
-  std::string dec_src = url_decode(name_str);
-  const char *src = dec_src.c_str();
-
-  if (*src == '/') ++src;
-
-  string str(src);
+  boost::string_view dec_src{name_str};
+  if (dec_src[0] == '/')
+    dec_src.remove_prefix(1);
 
-  pos = str.find('/');
+  pos = dec_src.find('/');
   if (pos ==string::npos)
     return false;
 
-  bucket_name = str.substr(0, pos);
-  key.name = str.substr(pos + 1);
+  boost::string_view bn_view{dec_src.substr(0, pos)};
+  bucket_name = std::string{bn_view.data(), bn_view.size()};
+
+  boost::string_view kn_view{dec_src.substr(pos + 1)};
+  key.name = std::string{kn_view.data(), kn_view.size()};
 
   if (key.name.empty()) {
     return false;
   }
 
-  if (!params_str.empty()) {
+  if (! params_str.empty()) {
     RGWHTTPArgs args;
-    args.set(params_str);
+    args.set(params_str.to_string());
     args.parse();
 
     key.instance = args.get("versionId", NULL);
@@ -4828,6 +4874,25 @@ void RGWPutLC::execute()
   RGWLCXMLParser_S3 parser(s->cct);
   RGWLifecycleConfiguration_S3 new_config(s->cct);
 
+  content_md5 = s->info.env->get("HTTP_CONTENT_MD5");
+  if (content_md5 == nullptr) {
+    op_ret = -ERR_INVALID_REQUEST;
+    s->err.message = "Missing required header for this request: Content-MD5";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    return;
+  }
+
+  std::string content_md5_bin;
+  try {
+    content_md5_bin = rgw::from_base64(boost::string_view(content_md5));
+  } catch (...) {
+    s->err.message = "Request header Content-MD5 contains character "
+                     "that is not base64 encoded.";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    op_ret = -ERR_BAD_DIGEST;
+    return;
+  }
+
   if (!parser.init()) {
     op_ret = -EINVAL;
     return;
@@ -4839,6 +4904,21 @@ void RGWPutLC::execute()
 
   ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
 
+  MD5 data_hash;
+  unsigned char data_hash_res[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  data_hash.Update(reinterpret_cast<const byte*>(data), len);
+  data_hash.Final(data_hash_res);
+
+  if (memcmp(data_hash_res, content_md5_bin.c_str(), CEPH_CRYPTO_MD5_DIGESTSIZE) != 0) {
+    op_ret = -ERR_BAD_DIGEST;
+    s->err.message = "The Content-MD5 you specified did not match what we received.";
+    ldout(s->cct, 5) << s->err.message
+                     << " Specified content md5: " << content_md5
+                     << ", calculated content md5: " << data_hash_res
+                     << dendl;
+    return;
+  }
+
   if (!parser.parse(data, len, 1)) {
     op_ret = -ERR_MALFORMED_XML;
     return;
@@ -4925,7 +5005,7 @@ void RGWDeleteLC::execute()
       }
     }
   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
-  string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+  string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;
   pair<string, int> entry(shard_id, lc_uninitial);
   string oid; 
   get_lc_oid(s, oid);
@@ -4957,16 +5037,7 @@ void RGWDeleteLC::execute()
 
 int RGWGetCORS::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3PutBucketCORS,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketCORS);
 }
 
 void RGWGetCORS::execute()
@@ -4984,16 +5055,7 @@ void RGWGetCORS::execute()
 
 int RGWPutCORS::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3PutBucketCORS,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketCORS);
 }
 
 void RGWPutCORS::execute()
@@ -5012,18 +5074,17 @@ void RGWPutCORS::execute()
     }
   }
 
-  map<string, bufferlist> attrs = s->bucket_attrs;
-  attrs[RGW_ATTR_CORS] = cors_bl;
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      map<string, bufferlist> attrs = s->bucket_attrs;
+      attrs[RGW_ATTR_CORS] = cors_bl;
+      return rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+    });
 }
 
 int RGWDeleteCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  // No separate delete permission
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketCORS);
 }
 
 void RGWDeleteCORS::execute()
@@ -5033,32 +5094,35 @@ void RGWDeleteCORS::execute()
     return;
 
   bufferlist bl;
-  rgw_raw_obj obj;
   if (!cors_exist) {
     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
     op_ret = -ENOENT;
     return;
   }
-  store->get_bucket_instance_obj(s->bucket, obj);
-  store->set_prefetch_data(s->obj_ctx, obj);
-  map<string, bufferlist> orig_attrs, attrs, rmattrs;
-  map<string, bufferlist>::iterator iter;
-
-  op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
-  if (op_ret < 0)
-    return;
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      rgw_raw_obj obj;
+      store->get_bucket_instance_obj(s->bucket, obj);
+      store->set_prefetch_data(s->obj_ctx, obj);
+      map<string, bufferlist> orig_attrs, attrs, rmattrs;
+      map<string, bufferlist>::iterator iter;
 
-  /* only remove meta attrs */
-  for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
-    const string& name = iter->first;
-    dout(10) << "DeleteCORS : attr: " << name << dendl;
-    if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
-      rmattrs[name] = iter->second;
-    } else if (attrs.find(name) == attrs.end()) {
-      attrs[name] = iter->second;
-    }
-  }
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+      op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
+      if (op_ret < 0)
+       return op_ret;
+
+      /* only remove meta attrs */
+      for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
+       const string& name = iter->first;
+       dout(10) << "DeleteCORS : attr: " << name << dendl;
+       if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
+         rmattrs[name] = iter->second;
+       } else if (attrs.find(name) == attrs.end()) {
+         attrs[name] = iter->second;
+       }
+      }
+      return rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                 &s->bucket_info.objv_tracker);
+    });
 }
 
 void RGWOptionsCORS::get_response_params(string& hdrs, string& exp_hdrs, unsigned *max_age) {
@@ -5075,6 +5139,11 @@ int RGWOptionsCORS::validate_cors_request(RGWCORSConfiguration *cc) {
   if (!validate_cors_rule_method(rule, req_meth)) {
     return -ENOENT;
   }
+
+  if (!validate_cors_rule_header(rule, req_hdrs)) {
+    return -ENOENT;
+  }
+
   return 0;
 }
 
@@ -5116,13 +5185,7 @@ void RGWOptionsCORS::execute()
 
 int RGWGetRequestPayment::verify_permission()
 {
-  if (s->iam_policy &&
-      s->iam_policy->eval(s->env, *s->auth.identity,
-                         rgw::IAM::s3GetBucketRequestPayment,
-                         ARN(s->bucket)) != Effect::Allow) {
-      return -EACCES;
-  }
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketRequestPayment);
 }
 
 void RGWGetRequestPayment::pre_exec()
@@ -5137,16 +5200,7 @@ void RGWGetRequestPayment::execute()
 
 int RGWSetRequestPayment::verify_permission()
 {
-  if (s->iam_policy) {
-    if (s->iam_policy->eval(s->env, *s->auth.identity,
-                           rgw::IAM::s3PutBucketRequestPayment,
-                           ARN(s->bucket)) == Effect::Allow) {
-      return 0;
-    }
-  } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return 0;
-  }
-  return -EACCES;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketRequestPayment);
 }
 
 void RGWSetRequestPayment::pre_exec()
@@ -5690,7 +5744,7 @@ void RGWListMultipart::execute()
 int RGWListBucketMultiparts::verify_permission()
 {
   if (!verify_bucket_permission(s,
-                               rgw::IAM::s3ListBucketMultiPartUploads))
+                               rgw::IAM::s3ListBucketMultipartUploads))
     return -EACCES;
 
   return 0;
@@ -6877,15 +6931,15 @@ void RGWPutBucketPolicy::execute()
   }
 
   try {
-    Policy p(s->cct, s->bucket_tenant, in_data);
-    auto attrs = s->bucket_attrs;
-    attrs[RGW_ATTR_IAM_POLICY].clear();
-    attrs[RGW_ATTR_IAM_POLICY].append(p.text);
-    op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                                 &s->bucket_info.objv_tracker);
-    if (op_ret == -ECANCELED) {
-      op_ret = 0; /* lost a race, but it's ok because policies are immutable */
-    }
+    const Policy p(s->cct, s->bucket_tenant, in_data);
+    op_ret = retry_raced_bucket_write(store, s, [&p, this] {
+       auto attrs = s->bucket_attrs;
+       attrs[RGW_ATTR_IAM_POLICY].clear();
+       attrs[RGW_ATTR_IAM_POLICY].append(p.text);
+       op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                     &s->bucket_info.objv_tracker);
+       return op_ret;
+      });
   } catch (rgw::IAM::PolicyParseException& e) {
     ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
     op_ret = -EINVAL;
@@ -6953,11 +7007,18 @@ int RGWDeleteBucketPolicy::verify_permission()
 
 void RGWDeleteBucketPolicy::execute()
 {
-  auto attrs = s->bucket_attrs;
-  attrs.erase(RGW_ATTR_IAM_POLICY);
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
-  if (op_ret == -ECANCELED) {
-    op_ret = 0; /* lost a race, but it's ok because policies are immutable */
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      auto attrs = s->bucket_attrs;
+      attrs.erase(RGW_ATTR_IAM_POLICY);
+      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                   &s->bucket_info.objv_tracker);
+      return op_ret;
+    });
+}
+
+void RGWGetClusterStat::execute()
+{
+  op_ret = this->store->get_rados_handle()->cluster_stat(stats_op);
 }
+
+