]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_op.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_op.cc
index 844b1dad5b020fdd3aedea13a20b18579f9caaca..98a6db4703f2caa1471757c95ff06ec00a5dc096 100644 (file)
@@ -3,15 +3,21 @@
 
 #include <errno.h>
 #include <stdlib.h>
+#include <system_error>
 #include <unistd.h>
 
 #include <sstream>
 
 #include <boost/algorithm/string/predicate.hpp>
+#include <boost/bind.hpp>
 #include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
+#include <boost/utility/string_view.hpp>
 
 #include "common/Clock.h"
 #include "common/armor.h"
+#include "common/backport14.h"
+#include "common/errno.h"
 #include "common/mime.h"
 #include "common/utf8.h"
 #include "common/ceph_json.h"
@@ -35,6 +41,7 @@
 #include "rgw_client_io.h"
 #include "rgw_compression.h"
 #include "rgw_role.h"
+#include "rgw_tag_s3.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/rgw/cls_rgw_client.h"
 
 using namespace std;
 using namespace librados;
 using ceph::crypto::MD5;
+using boost::optional;
+using boost::none;
 
+using rgw::IAM::ARN;
+using rgw::IAM::Effect;
+using rgw::IAM::Policy;
+
+using rgw::IAM::Policy;
 
 static string mp_ns = RGW_OBJ_NS_MULTIPART;
 static string shadow_ns = RGW_OBJ_NS_SHADOW;
@@ -62,41 +76,42 @@ static int forward_request_to_master(struct req_state *s, obj_version *objv, RGW
 
 static MultipartMetaFilter mp_filter;
 
-static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_content)
+int RGWGetObj::parse_range(void)
 {
   int r = -ERANGE;
-  string s(range);
+  string rs(range_str);
   string ofs_str;
   string end_str;
 
-  *partial_content = false;
+  ignore_invalid_range = s->cct->_conf->rgw_ignore_get_invalid_range;
+  partial_content = false;
 
-  size_t pos = s.find("bytes=");
+  size_t pos = rs.find("bytes=");
   if (pos == string::npos) {
     pos = 0;
-    while (isspace(s[pos]))
+    while (isspace(rs[pos]))
       pos++;
     int end = pos;
-    while (isalpha(s[end]))
+    while (isalpha(rs[end]))
       end++;
-    if (strncasecmp(s.c_str(), "bytes", end - pos) != 0)
+    if (strncasecmp(rs.c_str(), "bytes", end - pos) != 0)
       return 0;
-    while (isspace(s[end]))
+    while (isspace(rs[end]))
       end++;
-    if (s[end] != '=')
+    if (rs[end] != '=')
       return 0;
-    s = s.substr(end + 1);
+    rs = rs.substr(end + 1);
   } else {
-    s = s.substr(pos + 6); /* size of("bytes=")  */
+    rs = rs.substr(pos + 6); /* size of("bytes=")  */
   }
-  pos = s.find('-');
+  pos = rs.find('-');
   if (pos == string::npos)
     goto done;
 
-  *partial_content = true;
+  partial_content = true;
 
-  ofs_str = s.substr(0, pos);
-  end_str = s.substr(pos + 1);
+  ofs_str = rs.substr(0, pos);
+  end_str = rs.substr(pos + 1);
   if (end_str.length()) {
     end = atoll(end_str.c_str());
     if (end < 0)
@@ -113,8 +128,18 @@ static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_
   if (end >= 0 && end < ofs)
     goto done;
 
-  r = 0;
+  range_parsed = true;
+  return 0;
+
 done:
+  if (ignore_invalid_range) {
+    partial_content = false;
+    ofs = 0;
+    end = -1;
+    range_parsed = false; // allow retry
+    r = 0;
+  }
+
   return r;
 }
 
@@ -140,9 +165,9 @@ static int decode_policy(CephContext *cct,
 
 
 static int get_user_policy_from_attr(CephContext * const cct,
-                                     RGWRados * const store,
-                                     map<string, bufferlist>& attrs,
-                                     RGWAccessControlPolicy& policy    /* out */)
+                                    RGWRados * const store,
+                                    map<string, bufferlist>& attrs,
+                                    RGWAccessControlPolicy& policy    /* out */)
 {
   auto aiter = attrs.find(RGW_ATTR_ACL);
   if (aiter != attrs.end()) {
@@ -158,11 +183,11 @@ static int get_user_policy_from_attr(CephContext * const cct,
 }
 
 static int get_bucket_instance_policy_from_attr(CephContext *cct,
-                                       RGWRados *store,
-                                       RGWBucketInfo& bucket_info,
-                                       map<string, bufferlist>& bucket_attrs,
-                                       RGWAccessControlPolicy *policy,
-                                       rgw_raw_obj& obj)
+                                               RGWRados *store,
+                                               RGWBucketInfo& bucket_info,
+                                               map<string, bufferlist>& bucket_attrs,
+                                               RGWAccessControlPolicy *policy,
+                                               rgw_raw_obj& obj)
 {
   map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_ACL);
 
@@ -184,12 +209,12 @@ static int get_bucket_instance_policy_from_attr(CephContext *cct,
 }
 
 static int get_obj_policy_from_attr(CephContext *cct,
-                                    RGWRados *store,
-                                    RGWObjectCtx& obj_ctx,
-                                    RGWBucketInfo& bucket_info,
-                                    map<string, bufferlist>& bucket_attrs,
-                                    RGWAccessControlPolicy *policy,
-                                    rgw_obj& obj)
+                                   RGWRados *store,
+                                   RGWObjectCtx& obj_ctx,
+                                   RGWBucketInfo& bucket_info,
+                                   map<string, bufferlist>& bucket_attrs,
+                                   RGWAccessControlPolicy *policy,
+                                   rgw_obj& obj)
 {
   bufferlist bl;
   int ret = 0;
@@ -224,15 +249,27 @@ static int get_obj_policy_from_attr(CephContext *cct,
  * Returns: 0 on success, -ERR# otherwise.
  */
 static int get_bucket_policy_from_attr(CephContext *cct,
-                                RGWRados *store,
-                                RGWBucketInfo& bucket_info,
-                                map<string, bufferlist>& bucket_attrs,
-                                RGWAccessControlPolicy *policy)
+                                      RGWRados *store,
+                                      RGWBucketInfo& bucket_info,
+                                      map<string, bufferlist>& bucket_attrs,
+                                      RGWAccessControlPolicy *policy)
 {
   rgw_raw_obj instance_obj;
   store->get_bucket_instance_obj(bucket_info.bucket, instance_obj);
   return get_bucket_instance_policy_from_attr(cct, store, bucket_info, bucket_attrs,
-                                              policy, instance_obj);
+                                             policy, instance_obj);
+}
+
+static optional<Policy> get_iam_policy_from_attr(CephContext* cct,
+                                                RGWRados* store,
+                                                map<string, bufferlist>& attrs,
+                                                const string& tenant) {
+  auto i = attrs.find(RGW_ATTR_IAM_POLICY);
+  if (i != attrs.end()) {
+    return Policy(cct, tenant, i->second);
+  } else {
+    return none;
+  }
 }
 
 static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map<string, bufferlist>& attrs)
@@ -241,7 +278,6 @@ static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
-  read_op.params.perr = &s->err;
 
   return read_op.prepare();
 }
@@ -253,7 +289,6 @@ static int modify_obj_attr(RGWRados *store, struct req_state *s, rgw_obj& obj, c
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
-  read_op.params.perr = &s->err;
   
   int r = read_op.prepare();
   if (r < 0) {
@@ -305,7 +340,8 @@ static int read_obj_policy(RGWRados *store,
                            struct req_state *s,
                            RGWBucketInfo& bucket_info,
                            map<string, bufferlist>& bucket_attrs,
-                           RGWAccessControlPolicy *policy,
+                           RGWAccessControlPolicy* acl,
+                          optional<Policy>& policy,
                            rgw_bucket& bucket,
                            rgw_obj_key& object)
 {
@@ -327,9 +363,11 @@ static int read_obj_policy(RGWRados *store,
   } else {
     obj = rgw_obj(bucket, object);
   }
+  policy = get_iam_policy_from_attr(s->cct, store, bucket_attrs, bucket.tenant);
+
   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
   int ret = get_obj_policy_from_attr(s->cct, store, *obj_ctx,
-                                     bucket_info, bucket_attrs, policy, obj);
+                                     bucket_info, bucket_attrs, acl, obj);
   if (ret == -ENOENT) {
     /* object does not exist checking the bucket's ACL to make sure
        that we send a proper error code */
@@ -375,18 +413,17 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
   }
 
   if(s->dialect.compare("s3") == 0) {
-    s->bucket_acl = new RGWAccessControlPolicy_S3(s->cct);
+    s->bucket_acl = ceph::make_unique<RGWAccessControlPolicy_S3>(s->cct);
   } else if(s->dialect.compare("swift")  == 0) {
     /* We aren't allocating the account policy for those operations using
      * the Swift's infrastructure that don't really need req_state::user.
      * Typical example here is the implementation of /info. */
     if (!s->user->user_id.empty()) {
-      s->user_acl = std::unique_ptr<RGWAccessControlPolicy>(
-          new RGWAccessControlPolicy_SWIFTAcct(s->cct));
+      s->user_acl = ceph::make_unique<RGWAccessControlPolicy_SWIFTAcct>(s->cct);
     }
-    s->bucket_acl = new RGWAccessControlPolicy_SWIFT(s->cct);
+    s->bucket_acl = ceph::make_unique<RGWAccessControlPolicy_SWIFT>(s->cct);
   } else {
-    s->bucket_acl = new RGWAccessControlPolicy(s->cct);
+    s->bucket_acl = ceph::make_unique<RGWAccessControlPolicy>(s->cct);
   }
 
   /* check if copy source is within the current domain */
@@ -431,7 +468,8 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     s->bucket = s->bucket_info.bucket;
 
     if (s->bucket_exists) {
-      ret = read_bucket_policy(store, s, s->bucket_info, s->bucket_attrs, s->bucket_acl, s->bucket);
+      ret = read_bucket_policy(store, s, s->bucket_info, s->bucket_attrs,
+                               s->bucket_acl.get(), s->bucket);
       acct_acl_user = {
         s->bucket_info.owner,
         s->bucket_acl->get_owner().get_display_name(),
@@ -466,8 +504,10 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
       /* we now need to make sure that the operation actually requires copy source, that is
        * it's a copy operation
        */
-      if (store->get_zonegroup().is_master && s->system_request) {
+      if (store->get_zonegroup().is_master_zonegroup() && s->system_request) {
         /*If this is the master, don't redirect*/
+      } else if (s->op_type == RGW_OP_GET_BUCKET_LOCATION ) {
+        /* If op is get bucket location, don't redirect */
       } else if (!s->local_source ||
           (s->op != OP_PUT && s->op != OP_COPY) ||
           s->object.empty()) {
@@ -505,6 +545,16 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     }
   }
 
+  try {
+    s->iam_policy = get_iam_policy_from_attr(s->cct, store, s->bucket_attrs,
+                                            s->bucket_tenant);
+  } catch (const std::exception& e) {
+    // Really this is a can't happen condition. We parse the policy
+    // when it's given to us, so perhaps we should abort or otherwise
+    // raise bloody murder.
+    lderr(s->cct) << "Error reading IAM Policy: " << e.what() << dendl;
+    ret = -EACCES;
+  }
 
   return ret;
 }
@@ -524,7 +574,7 @@ int rgw_build_object_policies(RGWRados *store, struct req_state *s,
     if (!s->bucket_exists) {
       return -ERR_NO_SUCH_BUCKET;
     }
-    s->object_acl = new RGWAccessControlPolicy(s->cct);
+    s->object_acl = ceph::make_unique<RGWAccessControlPolicy>(s->cct);
 
     rgw_obj obj(s->bucket, s->object);
       
@@ -532,13 +582,72 @@ int rgw_build_object_policies(RGWRados *store, struct req_state *s,
     if (prefetch_data) {
       store->set_prefetch_data(s->obj_ctx, obj);
     }
-    ret = read_obj_policy(store, s, s->bucket_info, s->bucket_attrs, s->object_acl, s->bucket, s->object);
+    ret = read_obj_policy(store, s, s->bucket_info, s->bucket_attrs,
+                         s->object_acl.get(), s->iam_policy, s->bucket,
+                          s->object);
   }
 
   return ret;
 }
 
-static void rgw_bucket_object_pre_exec(struct req_state *s)
+rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
+                                               struct req_state* s)
+{
+  rgw::IAM::Environment e;
+  const auto& m = s->info.env->get_map();
+  auto t = ceph::real_clock::now();
+  e.emplace("aws:CurrentTime", std::to_string(ceph::real_clock::to_time_t(t)));
+  e.emplace("aws:EpochTime", ceph::to_iso_8601(t));
+  // TODO: This is fine for now, but once we have STS we'll need to
+  // look and see. Also this won't work with the IdentityApplier
+  // model, since we need to know the actual credential.
+  e.emplace("aws:PrincipalType", "User");
+
+  auto i = m.find("HTTP_REFERER");
+  if (i != m.end()) {
+    e.emplace("aws:Referer", i->second);
+  }
+
+  // These seem to be the semantics, judging from rest_rgw_s3.cc
+  i = m.find("SERVER_PORT_SECURE");
+  if (i != m.end()) {
+    e.emplace("aws:SecureTransport", "true");
+  }
+
+  const auto remote_addr_param = s->cct->_conf->rgw_remote_addr_param;
+  if (remote_addr_param.length()) {
+    i = m.find(remote_addr_param);
+  } else {
+    i = m.find("REMOTE_ADDR");
+  }
+  if (i != m.end()) {
+    const string* ip = &(i->second);
+    string temp;
+    if (remote_addr_param == "HTTP_X_FORWARDED_FOR") {
+      const auto comma = ip->find(',');
+      if (comma != string::npos) {
+       temp.assign(*ip, 0, comma);
+       ip = &temp;
+      }
+    }
+    e.emplace("aws:SourceIp", *ip);
+  }
+
+  i = m.find("HTTP_USER_AGENT"); {
+  if (i != m.end())
+    e.emplace("aws:UserAgent", i->second);
+  }
+
+  if (s->user) {
+    // What to do about aws::userid? One can have multiple access
+    // keys so that isn't really suitable. Do we have a durable
+    // identifier that can persist through name changes?
+    e.emplace("aws:username", s->user->user_id.id);
+  }
+  return e;
+}
+
+void rgw_bucket_object_pre_exec(struct req_state *s)
 {
   if (s->expect_cont)
     dump_continue(s);
@@ -546,6 +655,37 @@ static void rgw_bucket_object_pre_exec(struct req_state *s)
   dump_bucket_from_state(s);
 }
 
+// So! Now and then when we try to update bucket information, the
+// bucket has changed during the course of the operation. (Or we have
+// a cache consistency problem that Watch/Notify isn't ruling out
+// completely.)
+//
+// When this happens, we need to update the bucket info and try
+// again. We have, however, to try the right *part* again.  We can't
+// simply re-send, since that will obliterate the previous update.
+//
+// Thus, callers of this function should include everything that
+// merges information to be changed into the bucket information as
+// well as the call to set it.
+//
+// The called function must return an integer, negative on error. In
+// general, they should just return op_ret.
+namespace {
+template<typename F>
+int retry_raced_bucket_write(RGWRados* g, req_state* s, const F& f) {
+  auto r = f();
+  for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
+    r = g->try_refresh_bucket_info(s->bucket_info, nullptr,
+                                  &s->bucket_attrs);
+    if (r >= 0) {
+      r = f();
+    }
+  }
+  return r;
+}
+}
+
+
 int RGWGetObj::verify_permission()
 {
   obj = rgw_obj(s->bucket, s->object);
@@ -554,7 +694,21 @@ int RGWGetObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, obj);
   }
 
-  if (!verify_object_permission(s, RGW_PERM_READ)) {
+  if (torrent.get_flag()) {
+    if (obj.key.instance.empty()) {
+      action = rgw::IAM::s3GetObjectTorrent;
+    } else {
+      action = rgw::IAM::s3GetObjectVersionTorrent;
+    }
+  } else {
+    if (obj.key.instance.empty()) {
+      action = rgw::IAM::s3GetObject;
+    } else {
+      action = rgw::IAM::s3GetObjectVersion;
+    }
+  }
+
+  if (!verify_object_permission(s, action)) {
     return -EACCES;
   }
 
@@ -581,25 +735,123 @@ int RGWOp::verify_op_mask()
   return 0;
 }
 
-int RGWOp::do_aws4_auth_completion()
+int RGWGetObjTags::verify_permission()
 {
-  int ret;
+  if (!verify_object_permission(s,
+                               s->object.instance.empty() ?
+                               rgw::IAM::s3GetObjectTagging:
+                               rgw::IAM::s3GetObjectVersionTagging))
+    return -EACCES;
 
-  if (s->aws4_auth_needs_complete) {
-    /* complete */
-    ret = RGW_Auth_S3::authorize_aws4_auth_complete(store, s);
-    s->aws4_auth_needs_complete = false;
-    if (ret) {
-      return ret;
-    }
-    /* verify signature */
-    if (s->aws4_auth->signature != s->aws4_auth->new_signature) {
-      ret = -ERR_SIGNATURE_NO_MATCH;
-      ldout(s->cct, 20) << "delayed aws4 auth failed" << dendl;
-      return ret;
+  return 0;
+}
+
+void RGWGetObjTags::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWGetObjTags::execute()
+{
+  rgw_obj obj;
+  map<string,bufferlist> attrs;
+
+  obj = rgw_obj(s->bucket, s->object);
+
+  store->set_atomic(s->obj_ctx, obj);
+
+  op_ret = get_obj_attrs(store, s, obj, attrs);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "ERROR: failed to get obj attrs, obj=" << obj
+                    << " ret=" << op_ret << dendl;
+    return;
+  }
+
+  auto tags = attrs.find(RGW_ATTR_TAGS);
+  if(tags != attrs.end()){
+    has_tags = true;
+    tags_bl.append(tags->second);
+  }
+  send_response_data(tags_bl);
+}
+
+int RGWPutObjTags::verify_permission()
+{
+  if (!verify_object_permission(s,
+                               s->object.instance.empty() ?
+                               rgw::IAM::s3PutObjectTagging:
+                               rgw::IAM::s3PutObjectVersionTagging))
+    return -EACCES;
+  return 0;
+}
+
+void RGWPutObjTags::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0)
+    return;
+
+  if (s->object.empty()){
+    op_ret= -EINVAL; // we only support tagging on existing objects
+    return;
+  }
+
+  rgw_obj obj;
+  obj = rgw_obj(s->bucket, s->object);
+  store->set_atomic(s->obj_ctx, obj);
+  op_ret = modify_obj_attr(store, s, obj, RGW_ATTR_TAGS, tags_bl);
+  if (op_ret == -ECANCELED){
+    op_ret = -ERR_TAG_CONFLICT;
+  }
+}
+
+void RGWDeleteObjTags::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+
+int RGWDeleteObjTags::verify_permission()
+{
+  if (!s->object.empty()) {
+    if (!verify_object_permission(s,
+                                 s->object.instance.empty() ?
+                                 rgw::IAM::s3DeleteObjectTagging:
+                                 rgw::IAM::s3DeleteObjectVersionTagging))
+      return -EACCES;
+  }
+  return 0;
+}
+
+void RGWDeleteObjTags::execute()
+{
+  if (s->object.empty())
+    return;
+
+  rgw_obj obj;
+  obj = rgw_obj(s->bucket, s->object);
+  store->set_atomic(s->obj_ctx, obj);
+  map <string, bufferlist> attrs;
+  map <string, bufferlist> rmattr;
+  bufferlist bl;
+  rmattr[RGW_ATTR_TAGS] = bl;
+  op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, &rmattr);
+}
+
+int RGWOp::do_aws4_auth_completion()
+{
+  ldout(s->cct, 5) << "NOTICE: call to do_aws4_auth_completion"  << dendl;
+  if (s->auth.completer) {
+    if (!s->auth.completer->complete()) {
+      return -ERR_AMZ_CONTENT_SHA256_MISMATCH;
+    } else {
+      dout(10) << "v4 auth ok -- do_aws4_auth_completion" << dendl;
     }
-    /* authorization ok */
-    dout(10) << "v4 auth ok" << dendl;
+
+    /* TODO(rzarzynski): yes, we're really called twice on PUTs. Only first
+     * call passes, so we disable second one. This is old behaviour, sorry!
+     * Plan for tomorrow: seek and destroy. */
+    s->auth.completer = nullptr;
   }
 
   return 0;
@@ -674,6 +926,20 @@ static bool validate_cors_rule_method(RGWCORSRule *rule, const char *req_meth) {
   return true;
 }
 
+static bool validate_cors_rule_header(RGWCORSRule *rule, const char *req_hdrs) {
+  if (req_hdrs) {
+    vector<string> hdrs;
+    get_str_vec(req_hdrs, hdrs);
+    for (const auto& hdr : hdrs) {
+      if (!rule->is_header_allowed(hdr.c_str(), hdr.length())) {
+        dout(5) << "Header " << hdr << " is not registered in this rule" << dendl;
+        return false;
+      }
+    }
+  }
+  return true;
+}
+
 int RGWOp::read_bucket_cors()
 {
   bufferlist bl;
@@ -794,7 +1060,8 @@ bool RGWOp::generate_cors_headers(string& origin, string& method, string& header
 
 int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
                                        const rgw_bucket_dir_entry& ent,
-                                       RGWAccessControlPolicy * const bucket_policy,
+                                       RGWAccessControlPolicy * const bucket_acl,
+                                       const optional<Policy>& bucket_policy,
                                        const off_t start_ofs,
                                        const off_t end_ofs)
 {
@@ -825,12 +1092,11 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
   read_op.conds.if_match = ent.meta.etag.c_str();
   read_op.params.attrs = &attrs;
   read_op.params.obj_size = &obj_size;
-  read_op.params.perr = &s->err;
 
   op_ret = read_op.prepare();
   if (op_ret < 0)
     return op_ret;
-  op_ret = read_op.range_to_ofs(obj_size, cur_ofs, cur_end);
+  op_ret = read_op.range_to_ofs(ent.meta.accounted_size, cur_ofs, cur_end);
   if (op_ret < 0)
     return op_ret;
   bool need_decompress;
@@ -842,7 +1108,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
 
   if (need_decompress)
   {
-    if (cs_info.orig_size != ent.meta.size) {
+    if (cs_info.orig_size != ent.meta.accounted_size) {
       // hmm.. something wrong, object not as expected, abort!
       ldout(s->cct, 0) << "ERROR: expected cs_info.orig_size=" << cs_info.orig_size <<
           ", actual read size=" << ent.meta.size << dendl;
@@ -870,8 +1136,8 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
     ldout(s->cct, 2) << "overriding permissions due to system operation" << dendl;
   } else if (s->auth.identity->is_admin_of(s->user->user_id)) {
     ldout(s->cct, 2) << "overriding permissions due to admin operation" << dendl;
-  } else if (!verify_object_permission(s, s->user_acl.get(), bucket_policy,
-                                       &obj_policy, RGW_PERM_READ)) {
+  } else if (!verify_object_permission(s, part, s->user_acl.get(), bucket_acl,
+                                      &obj_policy, bucket_policy, action)) {
     return -EPERM;
   }
 
@@ -893,13 +1159,15 @@ static int iterate_user_manifest_parts(CephContext * const cct,
                                        const off_t end,
                                        RGWBucketInfo *pbucket_info,
                                        const string& obj_prefix,
-                                       RGWAccessControlPolicy * const bucket_policy,
+                                       RGWAccessControlPolicy * const bucket_acl,
+                                       const optional<Policy>& bucket_policy,
                                        uint64_t * const ptotal_len,
                                        uint64_t * const pobj_size,
                                        string * const pobj_sum,
                                        int (*cb)(rgw_bucket& bucket,
                                                  const rgw_bucket_dir_entry& ent,
-                                                 RGWAccessControlPolicy * const bucket_policy,
+                                                 RGWAccessControlPolicy * const bucket_acl,
+                                                 const optional<Policy>& bucket_policy,
                                                  off_t start_ofs,
                                                  off_t end_ofs,
                                                  void *param),
@@ -929,21 +1197,22 @@ static int iterate_user_manifest_parts(CephContext * const cct,
     }
 
     for (rgw_bucket_dir_entry& ent : objs) {
-      uint64_t cur_total_len = obj_ofs;
-      uint64_t start_ofs = 0, end_ofs = ent.meta.size;
+      const uint64_t cur_total_len = obj_ofs;
+      const uint64_t obj_size = ent.meta.accounted_size;
+      uint64_t start_ofs = 0, end_ofs = obj_size;
 
-      if (!found_start && cur_total_len + ent.meta.size > (uint64_t)ofs) {
+      if ((ptotal_len || cb) && !found_start && cur_total_len + obj_size > (uint64_t)ofs) {
        start_ofs = ofs - obj_ofs;
        found_start = true;
       }
 
-      obj_ofs += ent.meta.size;
+      obj_ofs += obj_size;
       if (pobj_sum) {
         etag_sum.Update((const byte *)ent.meta.etag.c_str(),
                         ent.meta.etag.length());
       }
 
-      if (!found_end && obj_ofs > (uint64_t)end) {
+      if ((ptotal_len || cb) && !found_end && obj_ofs > (uint64_t)end) {
        end_ofs = end - cur_total_len + 1;
        found_end = true;
       }
@@ -955,7 +1224,7 @@ static int iterate_user_manifest_parts(CephContext * const cct,
         len_count += end_ofs - start_ofs;
 
         if (cb) {
-          r = cb(bucket, ent, bucket_policy, start_ofs, end_ofs, cb_param);
+          r = cb(bucket, ent, bucket_acl, bucket_policy, start_ofs, end_ofs, cb_param);
           if (r < 0) {
             return r;
           }
@@ -981,13 +1250,12 @@ static int iterate_user_manifest_parts(CephContext * const cct,
 }
 
 struct rgw_slo_part {
-  RGWAccessControlPolicy *bucket_policy;
+  RGWAccessControlPolicy *bucket_acl = nullptr;
+  Policy* bucket_policy = nullptr;
   rgw_bucket bucket;
   string obj_name;
-  uint64_t size;
+  uint64_t size = 0;
   string etag;
-
-  rgw_slo_part() : bucket_policy(NULL), size(0) {}
 };
 
 static int iterate_slo_parts(CephContext *cct,
@@ -997,7 +1265,8 @@ static int iterate_slo_parts(CephContext *cct,
                              map<uint64_t, rgw_slo_part>& slo_parts,
                              int (*cb)(rgw_bucket& bucket,
                                        const rgw_bucket_dir_entry& ent,
-                                       RGWAccessControlPolicy *bucket_policy,
+                                       RGWAccessControlPolicy *bucket_acl,
+                                       const optional<Policy>& bucket_policy,
                                        off_t start_ofs,
                                        off_t end_ofs,
                                        void *param),
@@ -1023,7 +1292,7 @@ static int iterate_slo_parts(CephContext *cct,
     rgw_bucket_dir_entry ent;
 
     ent.key.name = part.obj_name;
-    ent.meta.size = part.size;
+    ent.meta.accounted_size = ent.meta.size = part.size;
     ent.meta.etag = part.etag;
 
     uint64_t cur_total_len = obj_ofs;
@@ -1046,8 +1315,12 @@ static int iterate_slo_parts(CephContext *cct,
 
     if (found_start) {
       if (cb) {
-        int r = cb(part.bucket, ent, part.bucket_policy, start_ofs, end_ofs, cb_param);
-        if (r < 0)
+       // SLO is a Swift thing, and Swift has no knowledge of S3 Policies.
+        int r = cb(part.bucket, ent, part.bucket_acl,
+                  (part.bucket_policy ?
+                   optional<Policy>(*part.bucket_policy) : none),
+                  start_ofs, end_ofs, cb_param);
+       if (r < 0)
           return r;
       }
     }
@@ -1060,36 +1333,36 @@ static int iterate_slo_parts(CephContext *cct,
 
 static int get_obj_user_manifest_iterate_cb(rgw_bucket& bucket,
                                             const rgw_bucket_dir_entry& ent,
-                                            RGWAccessControlPolicy * const bucket_policy,
+                                            RGWAccessControlPolicy * const bucket_acl,
+                                            const optional<Policy>& bucket_policy,
                                             const off_t start_ofs,
                                             const off_t end_ofs,
                                             void * const param)
 {
   RGWGetObj *op = static_cast<RGWGetObj *>(param);
-  return op->read_user_manifest_part(bucket, ent, bucket_policy, start_ofs, end_ofs);
+  return op->read_user_manifest_part(bucket, ent, bucket_acl, bucket_policy, start_ofs, end_ofs);
 }
 
 int RGWGetObj::handle_user_manifest(const char *prefix)
 {
-  ldout(s->cct, 2) << "RGWGetObj::handle_user_manifest() prefix=" << prefix << dendl;
+  const boost::string_view prefix_view(prefix);
+  ldout(s->cct, 2) << "RGWGetObj::handle_user_manifest() prefix="
+                   << prefix_view << dendl;
 
-  string prefix_str = prefix;
-  size_t pos = prefix_str.find('/');
-  if (pos == string::npos)
+  const size_t pos = prefix_view.find('/');
+  if (pos == string::npos) {
     return -EINVAL;
+  }
 
-  string bucket_name_raw, bucket_name;
-  bucket_name_raw = prefix_str.substr(0, pos);
-  url_decode(bucket_name_raw, bucket_name);
-
-  string obj_prefix_raw, obj_prefix;
-  obj_prefix_raw = prefix_str.substr(pos + 1);
-  url_decode(obj_prefix_raw, obj_prefix);
+  const std::string bucket_name = url_decode(prefix_view.substr(0, pos));
+  const std::string obj_prefix = url_decode(prefix_view.substr(pos + 1));
 
   rgw_bucket bucket;
 
-  RGWAccessControlPolicy _bucket_policy(s->cct);
-  RGWAccessControlPolicy *bucket_policy;
+  RGWAccessControlPolicy _bucket_acl(s->cct);
+  RGWAccessControlPolicy *bucket_acl;
+  optional<Policy> _bucket_policy;
+  optional<Policy>* bucket_policy;
   RGWBucketInfo bucket_info;
   RGWBucketInfo *pbucket_info;
 
@@ -1106,16 +1379,20 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
     }
     bucket = bucket_info.bucket;
     pbucket_info = &bucket_info;
-    bucket_policy = &_bucket_policy;
-    r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_policy, bucket);
+    bucket_acl = &_bucket_acl;
+    r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_acl, bucket);
     if (r < 0) {
       ldout(s->cct, 0) << "failed to read bucket policy" << dendl;
       return r;
     }
+    _bucket_policy = get_iam_policy_from_attr(s->cct, store, bucket_attrs,
+                                             bucket_info.bucket.tenant);
+    bucket_policy = &_bucket_policy;
   } else {
     bucket = s->bucket;
     pbucket_info = &s->bucket_info;
-    bucket_policy = s->bucket_acl;
+    bucket_acl = s->bucket_acl.get();
+    bucket_policy = &s->iam_policy;
   }
 
   /* dry run to find out:
@@ -1123,13 +1400,26 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
    * - overall DLO's content size,
    * - md5 sum of overall DLO's content (for etag of Swift API). */
   int r = iterate_user_manifest_parts(s->cct, store, ofs, end,
-        pbucket_info, obj_prefix, bucket_policy,
-        &total_len, &s->obj_size, &lo_etag,
+        pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
+        nullptr, &s->obj_size, &lo_etag,
         nullptr /* cb */, nullptr /* cb arg */);
   if (r < 0) {
     return r;
   }
 
+  r = RGWRados::Object::Read::range_to_ofs(s->obj_size, ofs, end);
+  if (r < 0) {
+    return r;
+  }
+
+  r = iterate_user_manifest_parts(s->cct, store, ofs, end,
+        pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
+        &total_len, nullptr, nullptr,
+        nullptr, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
   if (!get_data) {
     bufferlist bl;
     send_response_data(bl, 0, 0);
@@ -1137,7 +1427,7 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
   }
 
   r = iterate_user_manifest_parts(s->cct, store, ofs, end,
-        pbucket_info, obj_prefix, bucket_policy,
+        pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
         nullptr, nullptr, nullptr,
         get_obj_user_manifest_iterate_cb, (void *)this);
   if (r < 0) {
@@ -1164,8 +1454,8 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
   }
   ldout(s->cct, 2) << "RGWGetObj::handle_slo_manifest()" << dendl;
 
-  list<RGWAccessControlPolicy> allocated_policies;
-  map<string, RGWAccessControlPolicy *> policies;
+  vector<RGWAccessControlPolicy> allocated_acls;
+  map<string, pair<RGWAccessControlPolicy *, optional<Policy>>> policies;
   map<string, rgw_bucket> buckets;
 
   map<uint64_t, rgw_slo_part> slo_parts;
@@ -1197,16 +1487,18 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
     string obj_name = path.substr(pos_sep + 1);
 
     rgw_bucket bucket;
-    RGWAccessControlPolicy *bucket_policy;
+    RGWAccessControlPolicy *bucket_acl;
+    Policy* bucket_policy;
 
     if (bucket_name.compare(s->bucket.name) != 0) {
       const auto& piter = policies.find(bucket_name);
       if (piter != policies.end()) {
-        bucket_policy = piter->second;
-        bucket = buckets[bucket_name];
+        bucket_acl = piter->second.first;
+        bucket_policy = piter->second.second.get_ptr();
+       bucket = buckets[bucket_name];
       } else {
-        allocated_policies.push_back(RGWAccessControlPolicy(s->cct));
-        RGWAccessControlPolicy& _bucket_policy = allocated_policies.back();
+       allocated_acls.push_back(RGWAccessControlPolicy(s->cct));
+       RGWAccessControlPolicy& _bucket_acl = allocated_acls.back();
 
         RGWBucketInfo bucket_info;
         map<string, bufferlist> bucket_attrs;
@@ -1220,23 +1512,28 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
           return r;
         }
         bucket = bucket_info.bucket;
-        bucket_policy = &_bucket_policy;
-        r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_policy,
+        bucket_acl = &_bucket_acl;
+        r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_acl,
                                bucket);
         if (r < 0) {
-          ldout(s->cct, 0) << "failed to read bucket policy for bucket "
+          ldout(s->cct, 0) << "failed to read bucket ACL for bucket "
                            << bucket << dendl;
           return r;
-        }
-        buckets[bucket_name] = bucket;
-        policies[bucket_name] = bucket_policy;
+       }
+       auto _bucket_policy = get_iam_policy_from_attr(
+         s->cct, store, bucket_attrs, bucket_info.bucket.tenant);
+        bucket_policy = _bucket_policy.get_ptr();
+       buckets[bucket_name] = bucket;
+        policies[bucket_name] = make_pair(bucket_acl, _bucket_policy);
       }
     } else {
       bucket = s->bucket;
-      bucket_policy = s->bucket_acl;
+      bucket_acl = s->bucket_acl.get();
+      bucket_policy = s->iam_policy.get_ptr();
     }
 
     rgw_slo_part part;
+    part.bucket_acl = bucket_acl;
     part.bucket_policy = bucket_policy;
     part.bucket = bucket;
     part.obj_name = obj_name;
@@ -1261,17 +1558,14 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
   s->obj_size = slo_info.total_size;
   ldout(s->cct, 20) << "s->obj_size=" << s->obj_size << dendl;
 
-  if (ofs < 0) {
-    ofs = total_len - std::min(-ofs, static_cast<off_t>(total_len));
-  }
-
-  if (end < 0 || end >= static_cast<off_t>(total_len)) {
-    end = total_len - 1;
+  int r = RGWRados::Object::Read::range_to_ofs(total_len, ofs, end);
+  if (r < 0) {
+    return r;
   }
 
   total_len = end - ofs + 1;
 
-  int r = iterate_slo_parts(s->cct, store, ofs, end, slo_parts,
+  r = iterate_slo_parts(s->cct, store, ofs, end, slo_parts,
         get_obj_user_manifest_iterate_cb, (void *)this);
   if (r < 0) {
     return r;
@@ -1305,16 +1599,13 @@ bool RGWGetObj::prefetch_data()
   bool prefetch_first_chunk = true;
   range_str = s->info.env->get("HTTP_RANGE");
 
-  if(range_str) {
-    int r = parse_range(range_str, ofs, end, &partial_content);
-    /* error on parsing the range, stop prefetch and will fail in execte() */
+  if (range_str) {
+    int r = parse_range();
+    /* error on parsing the range, stop prefetch and will fail in execute() */
     if (r < 0) {
-      range_parsed = false;
-      return false;
-    } else {
-      range_parsed = true;
+      return false; /* range_parsed==false */
     }
-    /* range get goes to shadown objects, stop prefetch */
+    /* range get goes to shadow objects, stop prefetch */
     if (ofs >= s->cct->_conf->rgw_max_chunk_size) {
       prefetch_first_chunk = false;
     }
@@ -1322,6 +1613,7 @@ bool RGWGetObj::prefetch_data()
 
   return get_data && prefetch_first_chunk;
 }
+
 void RGWGetObj::pre_exec()
 {
   rgw_bucket_object_pre_exec(s);
@@ -1385,11 +1677,11 @@ void RGWGetObj::execute()
   read_op.params.attrs = &attrs;
   read_op.params.lastmod = &lastmod;
   read_op.params.obj_size = &s->obj_size;
-  read_op.params.perr = &s->err;
 
   op_ret = read_op.prepare();
   if (op_ret < 0)
     goto done_err;
+  version_id = read_op.state.obj.key.instance;
 
   /* STAT ops don't need data, and do no i/o */
   if (get_type() == RGW_OP_STAT_OBJ) {
@@ -1399,8 +1691,15 @@ void RGWGetObj::execute()
   /* start gettorrent */
   if (torrent.get_flag())
   {
+    attr_iter = attrs.find(RGW_ATTR_CRYPT_MODE);
+    if (attr_iter != attrs.end() && attr_iter->second.to_str() == "SSE-C-AES256") {
+      ldout(s->cct, 0) << "ERROR: torrents are not supported for objects "
+          "encrypted with SSE-C" << dendl;
+      op_ret = -EINVAL;
+      goto done_err;
+    }
     torrent.init(s, store);
-    torrent.get_torrent_file(op_ret, read_op, total_len, bl, obj);
+    op_ret = torrent.get_torrent_file(read_op, total_len, bl, obj);
     if (op_ret < 0)
     {
       ldout(s->cct, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret
@@ -1428,17 +1727,6 @@ void RGWGetObj::execute()
       decompress.emplace(s->cct, &cs_info, partial_content, filter);
       filter = &*decompress;
   }
-  // for range requests with obj size 0
-  if (range_str && !(s->obj_size)) {
-    total_len = 0;
-    op_ret = -ERANGE;
-    goto done_err;
-  }
-
-  op_ret = read_op.range_to_ofs(s->obj_size, ofs, end);
-  if (op_ret < 0)
-    goto done_err;
-  total_len = (ofs <= end ? end + 1 - ofs : 0);
 
   attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
   if (attr_iter != attrs.end() && !skip_manifest) {
@@ -1463,6 +1751,18 @@ void RGWGetObj::execute()
     return;
   }
 
+  // for range requests with obj size 0
+  if (range_str && !(s->obj_size)) {
+    total_len = 0;
+    op_ret = -ERANGE;
+    goto done_err;
+  }
+
+  op_ret = read_op.range_to_ofs(s->obj_size, ofs, end);
+  if (op_ret < 0)
+    goto done_err;
+  total_len = (ofs <= end ? end + 1 - ofs : 0);
+
   /* Check whether the object has expired. Swift API documentation
    * stands that we should return 404 Not Found in such case. */
   if (need_object_expiration() && object_is_expired(attrs)) {
@@ -1521,9 +1821,9 @@ done_err:
 int RGWGetObj::init_common()
 {
   if (range_str) {
-    /* range parsed error when prefetch*/
+    /* range parsed error when prefetch */
     if (!range_parsed) {
-      int r = parse_range(range_str, ofs, end, &partial_content);
+      int r = parse_range();
       if (r < 0)
         return r;
     }
@@ -1567,7 +1867,7 @@ void RGWListBuckets::execute()
   bool started = false;
   uint64_t total_count = 0;
 
-  uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
+  const uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
 
   op_ret = get_params();
   if (op_ret < 0) {
@@ -1602,15 +1902,32 @@ void RGWListBuckets::execute()
                        << s->user->user_id << dendl;
       break;
     }
-    map<string, RGWBucketEnt>& m = buckets.get_buckets();
-    map<string, RGWBucketEnt>::iterator iter;
-    for (iter = m.begin(); iter != m.end(); ++iter) {
-      RGWBucketEnt& bucket = iter->second;
-      buckets_size += bucket.size;
-      buckets_size_rounded += bucket.size_rounded;
-      buckets_objcount += bucket.count;
+
+    /* We need to have stats for all our policies - even if a given policy
+     * isn't actually used in a given account. In such situation its usage
+     * stats would be simply full of zeros. */
+    for (const auto& policy : store->get_zonegroup().placement_targets) {
+      policies_stats.emplace(policy.second.name,
+                             decltype(policies_stats)::mapped_type());
+    }
+
+    std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+    for (const auto& kv : m) {
+      const auto& bucket = kv.second;
+
+      global_stats.bytes_used += bucket.size;
+      global_stats.bytes_used_rounded += bucket.size_rounded;
+      global_stats.objects_count += bucket.count;
+
+      /* operator[] still can create a new entry for storage policy seen
+       * for first time. */
+      auto& policy_stats = policies_stats[bucket.placement_rule];
+      policy_stats.bytes_used += bucket.size;
+      policy_stats.bytes_used_rounded += bucket.size_rounded;
+      policy_stats.buckets_count++;
+      policy_stats.objects_count += bucket.count;
     }
-    buckets_count += m.size();
+    global_stats.buckets_count += m.size();
     total_count += m.size();
 
     done = (m.size() < read_count || (limit >= 0 && total_count >= (uint64_t)limit));
@@ -1621,10 +1938,10 @@ void RGWListBuckets::execute()
     }
 
     if (!m.empty()) {
-      send_response_data(buckets);
-
       map<string, RGWBucketEnt>::reverse_iterator riter = m.rbegin();
       marker = riter->first;
+
+      handle_listing_chunk(std::move(buckets));
     }
   } while (is_truncated && !done);
 
@@ -1682,14 +1999,20 @@ void RGWGetUsage::execute()
   op_ret = rgw_user_sync_all_stats(store, s->user->user_id);
   if (op_ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to sync user stats: " << dendl;
+    return;
+  }
+
+  op_ret = rgw_user_get_all_buckets_stats(store, s->user->user_id, buckets_usage);
+  if (op_ret < 0) {
+    cerr << "ERROR: failed to sync user stats: " << std::endl;
     return ;
   }
-  
+
   string user_str = s->user->user_id.to_str();
   op_ret = store->cls_user_get_header(user_str, &header);
   if (op_ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: can't read user header: "  << dendl;
-    return ;
+    return;
   }
   
   return;
@@ -1722,17 +2045,31 @@ void RGWStatAccount::execute()
                        << s->user->user_id << dendl;
       break;
     } else {
-      map<string, RGWBucketEnt>& m = buckets.get_buckets();
-      map<string, RGWBucketEnt>::iterator iter;
-      for (iter = m.begin(); iter != m.end(); ++iter) {
-        RGWBucketEnt& bucket = iter->second;
-        buckets_size += bucket.size;
-        buckets_size_rounded += bucket.size_rounded;
-        buckets_objcount += bucket.count;
-
-        marker = iter->first;
+      /* We need to have stats for all our policies - even if a given policy
+       * isn't actually used in a given account. In such situation its usage
+       * stats would be simply full of zeros. */
+      for (const auto& policy : store->get_zonegroup().placement_targets) {
+        policies_stats.emplace(policy.second.name,
+                               decltype(policies_stats)::mapped_type());
+      }
+
+      std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
+      for (const auto& kv : m) {
+        const auto& bucket = kv.second;
+
+        global_stats.bytes_used += bucket.size;
+        global_stats.bytes_used_rounded += bucket.size_rounded;
+        global_stats.objects_count += bucket.count;
+
+        /* operator[] still can create a new entry for storage policy seen
+         * for first time. */
+        auto& policy_stats = policies_stats[bucket.placement_rule];
+        policy_stats.bytes_used += bucket.size;
+        policy_stats.bytes_used_rounded += bucket.size_rounded;
+        policy_stats.buckets_count++;
+        policy_stats.objects_count += bucket.count;
       }
-      buckets_count += m.size();
+      global_stats.buckets_count += m.size();
 
     }
   } while (is_truncated);
@@ -1740,11 +2077,7 @@ void RGWStatAccount::execute()
 
 int RGWGetBucketVersioning::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketVersioning);
 }
 
 void RGWGetBucketVersioning::pre_exec()
@@ -1760,11 +2093,7 @@ void RGWGetBucketVersioning::execute()
 
 int RGWSetBucketVersioning::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketVersioning);
 }
 
 void RGWSetBucketVersioning::pre_exec()
@@ -1786,15 +2115,18 @@ void RGWSetBucketVersioning::execute()
     }
   }
 
-  if (enable_versioning) {
-    s->bucket_info.flags |= BUCKET_VERSIONED;
-    s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
-  } else {
-    s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      if (enable_versioning) {
+       s->bucket_info.flags |= BUCKET_VERSIONED;
+       s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
+      } else {
+       s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
+      }
+
+      return store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                            &s->bucket_attrs);
+    });
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
-                                         &s->bucket_attrs);
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
                     << " returned err=" << op_ret << dendl;
@@ -1804,10 +2136,7 @@ void RGWSetBucketVersioning::execute()
 
 int RGWGetBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketWebsite);
 }
 
 void RGWGetBucketWebsite::pre_exec()
@@ -1818,16 +2147,13 @@ void RGWGetBucketWebsite::pre_exec()
 void RGWGetBucketWebsite::execute()
 {
   if (!s->bucket_info.has_website) {
-    op_ret = -ENOENT;
+    op_ret = -ERR_NO_SUCH_WEBSITE_CONFIGURATION;
   }
 }
 
 int RGWSetBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketWebsite);
 }
 
 void RGWSetBucketWebsite::pre_exec()
@@ -1842,10 +2168,22 @@ void RGWSetBucketWebsite::execute()
   if (op_ret < 0)
     return;
 
-  s->bucket_info.has_website = true;
-  s->bucket_info.website_conf = website_conf;
+  if (!store->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << __func__ << " forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      s->bucket_info.has_website = true;
+      s->bucket_info.website_conf = website_conf;
+      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+                                              real_time(), &s->bucket_attrs);
+      return op_ret;
+    });
 
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
     return;
@@ -1854,10 +2192,7 @@ void RGWSetBucketWebsite::execute()
 
 int RGWDeleteBucketWebsite::verify_permission()
 {
-  if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
-    return -EACCES;
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3DeleteBucketWebsite);
 }
 
 void RGWDeleteBucketWebsite::pre_exec()
@@ -1867,10 +2202,13 @@ void RGWDeleteBucketWebsite::pre_exec()
 
 void RGWDeleteBucketWebsite::execute()
 {
-  s->bucket_info.has_website = false;
-  s->bucket_info.website_conf = RGWBucketWebsiteConf();
-
-  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      s->bucket_info.has_website = false;
+      s->bucket_info.website_conf = RGWBucketWebsiteConf();
+      op_ret = store->put_bucket_instance_info(s->bucket_info, false,
+                                              real_time(), &s->bucket_attrs);
+      return op_ret;
+    });
   if (op_ret < 0) {
     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
     return;
@@ -1879,7 +2217,8 @@ void RGWDeleteBucketWebsite::execute()
 
 int RGWStatBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_READ)) {
+  // This (a HEAD request on a bucket) is governed by the s3:ListBucket permission.
+  if (!verify_bucket_permission(s, rgw::IAM::s3ListBucket)) {
     return -EACCES;
   }
 
@@ -1918,7 +2257,22 @@ void RGWStatBucket::execute()
 
 int RGWListBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_READ)) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return op_ret;
+  }
+  if (!prefix.empty())
+    s->env.emplace("s3:prefix", prefix);
+
+  if (!delimiter.empty())
+    s->env.emplace("s3:delimiter", delimiter);
+
+  s->env.emplace("s3:max-keys", std::to_string(max));
+
+  if (!verify_bucket_permission(s,
+                               list_versions ?
+                               rgw::IAM::s3ListBucketVersions :
+                               rgw::IAM::s3ListBucket)) {
     return -EACCES;
   }
 
@@ -1931,6 +2285,7 @@ int RGWListBucket::parse_max_keys()
     char *endptr;
     max = strtol(max_keys.c_str(), &endptr, 10);
     if (endptr) {
+      if (endptr == max_keys.c_str()) return -EINVAL;
       while (*endptr && isspace(*endptr)) // ignore white space
         endptr++;
       if (*endptr) {
@@ -1956,10 +2311,6 @@ void RGWListBucket::execute()
     return;
   }
 
-  op_ret = get_params();
-  if (op_ret < 0)
-    return;
-
   if (need_container_stats()) {
     map<string, RGWBucketEnt> m;
     m[s->bucket.name] = RGWBucketEnt();
@@ -1983,27 +2334,19 @@ void RGWListBucket::execute()
   list_op.params.list_versions = list_versions;
 
   op_ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
-  if (op_ret >= 0 && !delimiter.empty()) {
+  if (op_ret >= 0) {
     next_marker = list_op.get_next_marker();
   }
 }
 
 int RGWGetBucketLogging::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketLogging);
 }
 
 int RGWGetBucketLocation::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketLocation);
 }
 
 int RGWCreateBucket::verify_permission()
@@ -2261,10 +2604,33 @@ void RGWCreateBucket::execute()
   if (op_ret < 0)
     return;
 
-  if (!store->get_zonegroup().is_master &&
+  if (!location_constraint.empty() &&
+      !store->has_zonegroup_api(location_constraint)) {
+      ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
+                       << " can't be found." << dendl;
+      op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
+      s->err.message = "The specified location-constraint is not valid";
+      return;
+  }
+
+  if (!store->get_zonegroup().is_master_zonegroup() && !location_constraint.empty() &&
       store->get_zonegroup().api_name != location_constraint) {
-    ldout(s->cct, 0) << "location constraint (" << location_constraint << ") doesn't match zonegroup" << " (" << store->get_zonegroup().api_name << ")" << dendl;
-    op_ret = -EINVAL;
+    ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
+                     << " doesn't match zonegroup" << " (" << store->get_zonegroup().api_name << ")"
+                     << dendl;
+    op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
+    s->err.message = "The specified location-constraint is not valid";
+    return;
+  }
+
+  const auto& zonegroup = store->get_zonegroup();
+  if (!placement_rule.empty() &&
+      !zonegroup.placement_targets.count(placement_rule)) {
+    ldout(s->cct, 0) << "placement target (" << placement_rule << ")"
+                     << " doesn't exist in the placement targets of zonegroup"
+                     << " (" << store->get_zonegroup().api_name << ")" << dendl;
+    op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
+    s->err.message = "The specified placement target does not exist";
     return;
   }
 
@@ -2357,8 +2723,11 @@ void RGWCreateBucket::execute()
   if (need_metadata_upload()) {
     /* It's supposed that following functions WILL NOT change any special
      * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
-    rgw_get_request_metadata(s->cct, s->info, attrs, false);
-    prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
+    op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+    if (op_ret < 0) {
+      return;
+    }
+    prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
     populate_with_generic_attrs(s, attrs);
 
     op_ret = filter_out_quota_info(attrs, rmattr_names, quota_info);
@@ -2450,7 +2819,10 @@ void RGWCreateBucket::execute()
 
       attrs.clear();
 
-      rgw_get_request_metadata(s->cct, s->info, attrs, false);
+      op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+      if (op_ret < 0) {
+        return;
+      }
       prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
       populate_with_generic_attrs(s, attrs);
       op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
@@ -2482,7 +2854,7 @@ void RGWCreateBucket::execute()
 
 int RGWDeleteBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (!verify_bucket_permission(s, rgw::IAM::s3DeleteBucket)) {
     return -EACCES;
   }
 
@@ -2550,6 +2922,27 @@ void RGWDeleteBucket::execute()
     }
   }
 
+  string prefix, delimiter;
+
+  if (s->prot_flags & RGW_REST_SWIFT) {
+    string path_args;
+    path_args = s->info.args.get("path");
+    if (!path_args.empty()) {
+      if (!delimiter.empty() || !prefix.empty()) {
+        op_ret = -EINVAL;
+        return;
+      }
+      prefix = path_args;
+      delimiter="/";
+    }
+  }
+
+  op_ret = abort_bucket_multiparts(store, s->cct, s->bucket_info, prefix, delimiter);
+
+  if (op_ret < 0) {
+    return;
+  }
+
   op_ret = store->delete_bucket(s->bucket_info, ot, false);
 
   if (op_ret == -ECANCELED) {
@@ -2560,7 +2953,7 @@ void RGWDeleteBucket::execute()
   }
 
   if (op_ret == 0) {
-    op_ret = rgw_unlink_bucket(store, s->user->user_id, s->bucket.tenant,
+    op_ret = rgw_unlink_bucket(store, s->bucket_info.owner, s->bucket.tenant,
                               s->bucket.name, false);
     if (op_ret < 0) {
       ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
@@ -2577,9 +2970,10 @@ void RGWDeleteBucket::execute()
 
 int RGWPutObj::verify_permission()
 {
-  if (copy_source) {
+  if (! copy_source.empty()) {
 
-    RGWAccessControlPolicy cs_policy(s->cct);
+    RGWAccessControlPolicy cs_acl(s->cct);
+    optional<Policy> policy;
     map<string, bufferlist> cs_attrs;
     rgw_bucket cs_bucket(copy_source_bucket_info.bucket);
     rgw_obj_key cs_object(copy_source_object_name, copy_source_version_id);
@@ -2589,19 +2983,45 @@ int RGWPutObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, obj);
 
     /* check source object permissions */
-    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_policy, cs_bucket, cs_object) < 0) {
+    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl,
+                       policy, cs_bucket, cs_object) < 0) {
       return -EACCES;
     }
 
     /* admin request overrides permission checks */
-    if (! s->auth.identity->is_admin_of(cs_policy.get_owner().get_id()) &&
-        ! cs_policy.verify_permission(*s->auth.identity, s->perm_mask, RGW_PERM_READ)) {
-      return -EACCES;
+    if (! s->auth.identity->is_admin_of(cs_acl.get_owner().get_id())) {
+      if (policy) {
+       auto e = policy->eval(s->env, *s->auth.identity,
+                             cs_object.instance.empty() ?
+                             rgw::IAM::s3GetObject :
+                             rgw::IAM::s3GetObjectVersion,
+                             rgw::IAM::ARN(obj));
+       if (e == Effect::Deny) {
+         return -EACCES; 
+       } else if (e == Effect::Pass &&
+                  !cs_acl.verify_permission(*s->auth.identity, s->perm_mask,
+                                               RGW_PERM_READ)) {
+         return -EACCES;
+       }
+      } else if (!cs_acl.verify_permission(*s->auth.identity, s->perm_mask,
+                                          RGW_PERM_READ)) {
+       return -EACCES;
+      }
     }
+  }
 
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
   }
 
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -2674,17 +3094,20 @@ int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
                                               map<string, bufferlist>& attrs,
                                               real_time delete_at,
                                               const char *if_match,
-                                              const char *if_nomatch, const string *user_data)
+                                              const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace)
 {
   complete_writing_data();
 
   RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj);
+  op_target.set_versioning_disabled(true);
   RGWRados::Object::Write head_obj_op(&op_target);
 
   head_obj_op.meta.set_mtime = set_mtime;
   head_obj_op.meta.mtime = mtime;
   head_obj_op.meta.owner = s->owner.get_id();
   head_obj_op.meta.delete_at = delete_at;
+  head_obj_op.meta.zones_trace = zones_trace;
+  head_obj_op.meta.modify_tail = true;
 
   int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
   if (r < 0)
@@ -2959,6 +3382,11 @@ void RGWPutObj::execute()
       ldout(s->cct, 20) << "check_quota() returned ret=" << op_ret << dendl;
       goto done;
     }
+    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
+      goto done;
+    }
   }
 
   if (supplied_etag) {
@@ -2979,7 +3407,7 @@ void RGWPutObj::execute()
                                           s->bucket_info,
                                           obj);
     if (op_ret < 0) {
-      return;
+      goto done;
     }
   }
 
@@ -2990,8 +3418,27 @@ void RGWPutObj::execute()
     goto done;
   }
 
+  if ((! copy_source.empty()) && !copy_source_range) {
+    rgw_obj_key obj_key(copy_source_object_name, copy_source_version_id);
+    rgw_obj obj(copy_source_bucket_info.bucket, obj_key.name);
+
+    RGWObjState *astate;
+    op_ret = store->get_obj_state(static_cast<RGWObjectCtx *>(s->obj_ctx),
+                                  copy_source_bucket_info, obj, &astate, true, false);
+    if (op_ret < 0) {
+      ldout(s->cct, 0) << "ERROR: get copy source obj state returned with error" << op_ret << dendl;
+      goto done;
+    }
+    if (!astate->exists){
+      op_ret = -ENOENT;
+      goto done;
+    }
+    lst = astate->accounted_size - 1;
+  } else {
+    lst = copy_source_range_lst;
+  }
+
   fst = copy_source_range_fst;
-  lst = copy_source_range_lst;
 
   op_ret = get_encrypt_filter(&encrypt, filter);
   if (op_ret < 0) {
@@ -3014,17 +3461,17 @@ void RGWPutObj::execute()
   }
 
   do {
-    bufferlist data_in;
+    bufferlist data;
     if (fst > lst)
       break;
-    if (!copy_source) {
-      len = get_data(data_in);
+    if (copy_source.empty()) {
+      len = get_data(data);
     } else {
       uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
-      op_ret = get_data(fst, cur_lst, data_in);
+      op_ret = get_data(fst, cur_lst, data);
       if (op_ret < 0)
         goto done;
-      len = data_in.length();
+      len = data.length();
       s->content_length += len;
       fst += len;
     }
@@ -3033,19 +3480,12 @@ void RGWPutObj::execute()
       goto done;
     }
 
-    bufferlist &data = data_in;
-    if (len && s->aws4_auth_streaming_mode) {
-      /* use unwrapped data */
-      data = s->aws4_auth->bl;
-      len = data.length();
-    }
-
     if (need_calc_md5) {
       hash.Update((const byte *)data.c_str(), data.length());
     }
 
-    /* save data for producing torrent data */
-    torrent.save_data(data_in);
+    /* update torrrent */
+    torrent.update(data);
 
     /* do we need this operation to be synchronous? if we're dealing with an object with immutable
      * head, e.g., multipart object we need to make sure we're the first one writing to this object
@@ -3118,9 +3558,7 @@ void RGWPutObj::execute()
     }
   }
 
-  if (!chunked_upload &&
-      ofs != s->content_length &&
-      !s->aws4_auth_streaming_mode) {
+  if (!chunked_upload && ofs != s->content_length) {
     op_ret = -ERR_REQUEST_TIMEOUT;
     goto done;
   }
@@ -3128,30 +3566,11 @@ void RGWPutObj::execute()
 
   perfcounter->inc(l_rgw_put_b, s->obj_size);
 
-  if (s->aws4_auth_needs_complete) {
-
-    /* complete aws4 auth */
-
-    op_ret = RGW_Auth_S3::authorize_aws4_auth_complete(store, s);
-    if (op_ret) {
-      goto done;
-    }
-
-    s->aws4_auth_needs_complete = false;
-
-    /* verify signature */
-
-    if (s->aws4_auth->signature != s->aws4_auth->new_signature) {
-      op_ret = -ERR_SIGNATURE_NO_MATCH;
-      ldout(s->cct, 20) << "delayed aws4 auth failed" << dendl;
-      goto done;
-    }
-
-    /* authorization ok */
-
-    dout(10) << "v4 auth ok" << dendl;
-
+  op_ret = do_aws4_auth_completion();
+  if (op_ret < 0) {
+    goto done;
   }
+
   op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
                               user_quota, bucket_quota, s->obj_size);
   if (op_ret < 0) {
@@ -3159,6 +3578,12 @@ void RGWPutObj::execute()
     goto done;
   }
 
+  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
+    goto done;
+  }
+
   hash.Final(m);
 
   if (compressor && compressor->is_compressed()) {
@@ -3215,15 +3640,19 @@ void RGWPutObj::execute()
   emplace_attr(RGW_ATTR_ETAG, std::move(bl));
 
   populate_with_generic_attrs(s, attrs);
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    goto done;
+  }
   encode_delete_at_attr(delete_at, attrs);
+  encode_obj_tags_attr(obj_tags.get(), attrs);
 
   /* Add a custom metadata to expose the information whether an object
    * is an SLO or not. Appending the attribute must be performed AFTER
    * processing any input from user in order to prohibit overwriting. */
   if (slo_info) {
     bufferlist slo_userindicator_bl;
-    ::encode("True", slo_userindicator_bl);
+    slo_userindicator_bl.append("True", 4);
     emplace_attr(RGW_ATTR_SLO_UINDICATOR, std::move(slo_userindicator_bl));
   }
 
@@ -3236,7 +3665,7 @@ void RGWPutObj::execute()
   {
     torrent.init(s, store);
     torrent.set_create_date(mtime);
-    op_ret =  torrent.handle_data();
+    op_ret =  torrent.complete();
     if (0 != op_ret)
     {
       ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl;
@@ -3281,6 +3710,7 @@ void RGWPostObj::execute()
   RGWPutObjDataProcessor *filter = nullptr;
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
+  char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
 
   /* Read in the data from the POST form. */
   op_ret = get_params();
@@ -3293,7 +3723,18 @@ void RGWPostObj::execute()
     return;
   }
 
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Deny) {
+      op_ret = -EACCES;
+      return;
+    } else if (e == Effect::Pass && !verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
+      op_ret = -EACCES;
+      return;
+    }
+  } else if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     op_ret = -EACCES;
     return;
   }
@@ -3317,6 +3758,26 @@ void RGWPostObj::execute()
       return;
     }
 
+    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+    if (op_ret < 0) {
+      return;
+    }
+
+    if (supplied_md5_b64) {
+      char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
+      ldout(s->cct, 15) << "supplied_md5_b64=" << supplied_md5_b64 << dendl;
+      op_ret = ceph_unarmor(supplied_md5_bin, &supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1],
+                            supplied_md5_b64, supplied_md5_b64 + strlen(supplied_md5_b64));
+      ldout(s->cct, 15) << "ceph_armor ret=" << op_ret << dendl;
+      if (op_ret != CEPH_CRYPTO_MD5_DIGESTSIZE) {
+        op_ret = -ERR_INVALID_DIGEST;
+        return;
+      }
+
+      buf_to_hex((const unsigned char *)supplied_md5_bin, CEPH_CRYPTO_MD5_DIGESTSIZE, supplied_md5);
+      ldout(s->cct, 15) << "supplied_md5=" << supplied_md5 << dendl;
+    }
+
     RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                                         s->bucket_info,
                                         s->bucket,
@@ -3391,12 +3852,22 @@ void RGWPostObj::execute()
 
     s->obj_size = ofs;
 
+    if (supplied_md5_b64 && strcmp(calc_md5, supplied_md5)) {
+      op_ret = -ERR_BAD_DIGEST;
+      return;
+    }
+
     op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
                                 user_quota, bucket_quota, s->obj_size);
     if (op_ret < 0) {
       return;
     }
 
+    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+    if (op_ret < 0) {
+      return;
+    }
+
     hash.Final(m);
     buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
 
@@ -3486,7 +3957,10 @@ int RGWPutMetadataAccount::init_processing()
     attrs.emplace(RGW_ATTR_ACL, std::move(acl_bl));
   }
 
-  rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  if (op_ret < 0) {
+    return op_ret;
+  }
   prepare_add_del_attrs(orig_attrs, rmattr_names, attrs);
   populate_with_generic_attrs(s, attrs);
 
@@ -3559,7 +4033,7 @@ void RGWPutMetadataAccount::execute()
 
 int RGWPutMetadataBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -3578,7 +4052,10 @@ void RGWPutMetadataBucket::execute()
     return;
   }
 
-  rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
+  if (op_ret < 0) {
+    return;
+  }
 
   if (!placement_rule.empty() &&
       placement_rule != s->bucket_info.placement_rule) {
@@ -3586,59 +4063,68 @@ void RGWPutMetadataBucket::execute()
     return;
   }
 
-  /* Encode special metadata first as we're using std::map::emplace under
-   * the hood. This method will add the new items only if the map doesn't
-   * contain such keys yet. */
-  if (has_policy) {
-    if (s->dialect.compare("swift") == 0) {
-       auto old_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl);
-       auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
-       new_policy->filter_merge(policy_rw_mask, old_policy);
-       policy = *new_policy;
-    }
-    buffer::list bl;
-    policy.encode(bl);
-    emplace_attr(RGW_ATTR_ACL, std::move(bl));
-  }
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      /* Encode special metadata first as we're using std::map::emplace under
+       * the hood. This method will add the new items only if the map doesn't
+       * contain such keys yet. */
+      if (has_policy) {
+       if (s->dialect.compare("swift") == 0) {
+         auto old_policy =                                             \
+           static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
+         auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
+         new_policy->filter_merge(policy_rw_mask, old_policy);
+         policy = *new_policy;
+       }
+       buffer::list bl;
+       policy.encode(bl);
+       emplace_attr(RGW_ATTR_ACL, std::move(bl));
+      }
 
-  if (has_cors) {
-    buffer::list bl;
-    cors_config.encode(bl);
-    emplace_attr(RGW_ATTR_CORS, std::move(bl));
-  }
+      if (has_cors) {
+       buffer::list bl;
+       cors_config.encode(bl);
+       emplace_attr(RGW_ATTR_CORS, std::move(bl));
+      }
 
-  /* It's supposed that following functions WILL NOT change any special
-   * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
-  prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
-  populate_with_generic_attrs(s, attrs);
+      /* It's supposed that following functions WILL NOT change any
+       * special attributes (like RGW_ATTR_ACL) if they are already
+       * present in attrs. */
+      prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
+      populate_with_generic_attrs(s, attrs);
 
-  /* According  to the Swift's behaviour and its container_quota WSGI middleware
-   * implementation: anyone with write permissions is able to set the bucket
-   * quota. This stays in contrast to account quotas that can be set only by
-   * clients holding reseller admin privileges. */
-  op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
-  if (op_ret < 0) {
-    return;
-  }
+      /* According to the Swift's behaviour and its container_quota
+       * WSGI middleware implementation: anyone with write permissions
+       * is able to set the bucket quota. This stays in contrast to
+       * account quotas that can be set only by clients holding
+       * reseller admin privileges. */
+      op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
+      if (op_ret < 0) {
+       return op_ret;
+      }
 
-  if (swift_ver_location) {
-    s->bucket_info.swift_ver_location = *swift_ver_location;
-    s->bucket_info.swift_versioning = (! swift_ver_location->empty());
-  }
+      if (swift_ver_location) {
+       s->bucket_info.swift_ver_location = *swift_ver_location;
+       s->bucket_info.swift_versioning = (!swift_ver_location->empty());
+      }
 
-  /* Web site of Swift API. */
-  filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
-  s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
+      /* Web site of Swift API. */
+      filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
+      s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
 
-  /* Setting attributes also stores the provided bucket info. Due to this
-   * fact, the new quota settings can be serialized with the same call. */
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
-                               &s->bucket_info.objv_tracker);
+      /* Setting attributes also stores the provided bucket info. Due
+       * to this fact, the new quota settings can be serialized with
+       * the same call. */
+      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                   &s->bucket_info.objv_tracker);
+      return op_ret;
+    });
 }
 
 int RGWPutMetadataObject::verify_permission()
 {
-  if (!verify_object_permission(s, RGW_PERM_WRITE)) {
+  // This looks to be something specific to Swift. We could add
+  // operations like swift:PutMetadataObject to the Policy Engine.
+  if (!verify_object_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -3662,7 +4148,11 @@ void RGWPutMetadataObject::execute()
     return;
   }
 
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    return;
+  }
+
   /* check if obj exists, read orig attrs */
   op_ret = get_obj_attrs(store, s, obj, orig_attrs);
   if (op_ret < 0) {
@@ -3715,20 +4205,14 @@ int RGWDeleteObj::handle_slo_manifest(bufferlist& bl)
     const string& path_str = iter.path;
 
     const size_t sep_pos = path_str.find('/', 1 /* skip first slash */);
-    if (string::npos == sep_pos) {
+    if (boost::string_view::npos == sep_pos) {
       return -EINVAL;
     }
 
     RGWBulkDelete::acct_path_t path;
 
-    string bucket_name;
-    url_decode(path_str.substr(1, sep_pos - 1), bucket_name);
-
-    string obj_name;
-    url_decode(path_str.substr(sep_pos + 1), obj_name);
-
-    path.bucket_name = bucket_name;
-    path.obj_key = obj_name;
+    path.bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
+    path.obj_key = url_decode(path_str.substr(sep_pos + 1));
 
     items.push_back(path);
   }
@@ -3749,7 +4233,19 @@ int RGWDeleteObj::handle_slo_manifest(bufferlist& bl)
 
 int RGWDeleteObj::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (s->iam_policy) {
+    auto r = s->iam_policy->eval(s->env, *s->auth.identity,
+                                s->object.instance.empty() ?
+                                rgw::IAM::s3DeleteObject :
+                                rgw::IAM::s3DeleteObjectVersion,
+                                ARN(s->bucket, s->object.name));
+    if (r == Effect::Allow)
+      return true;
+    else if (r == Effect::Deny)
+      return false;
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -3844,6 +4340,9 @@ void RGWDeleteObj::execute()
       }
     }
 
+    if (op_ret == -ECANCELED) {
+      op_ret = 0;
+    }
     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
       op_ret = 0;
     }
@@ -3852,11 +4351,12 @@ void RGWDeleteObj::execute()
   }
 }
 
-
-bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name, rgw_obj_key& key)
+bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
+                                    string& bucket_name,
+                                    rgw_obj_key& key)
 {
-  string name_str;
-  string params_str;
+  boost::string_view name_str;
+  boost::string_view params_str;
 
   size_t pos = url_src.find('?');
   if (pos == string::npos) {
@@ -3866,29 +4366,27 @@ bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name,
     params_str = url_src.substr(pos + 1);
   }
 
-  string dec_src;
-
-  url_decode(name_str, dec_src);
-  const char *src = dec_src.c_str();
-
-  if (*src == '/') ++src;
-
-  string str(src);
+  boost::string_view dec_src{name_str};
+  if (dec_src[0] == '/')
+    dec_src.remove_prefix(1);
 
-  pos = str.find('/');
+  pos = dec_src.find('/');
   if (pos ==string::npos)
     return false;
 
-  bucket_name = str.substr(0, pos);
-  key.name = str.substr(pos + 1);
+  boost::string_view bn_view{dec_src.substr(0, pos)};
+  bucket_name = std::string{bn_view.data(), bn_view.size()};
+
+  boost::string_view kn_view{dec_src.substr(pos + 1)};
+  key.name = std::string{kn_view.data(), kn_view.size()};
 
   if (key.name.empty()) {
     return false;
   }
 
-  if (!params_str.empty()) {
+  if (! params_str.empty()) {
     RGWHTTPArgs args;
-    args.set(params_str);
+    args.set(params_str.to_string());
     args.parse();
 
     key.instance = args.get("versionId", NULL);
@@ -3899,7 +4397,8 @@ bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name,
 
 int RGWCopyObj::verify_permission()
 {
-  RGWAccessControlPolicy src_policy(s->cct);
+  RGWAccessControlPolicy src_acl(s->cct);
+  optional<Policy> src_policy;
   op_ret = get_params();
   if (op_ret < 0)
     return op_ret;
@@ -3934,17 +4433,32 @@ int RGWCopyObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, src_obj);
 
     /* check source object permissions */
-    op_ret = read_obj_policy(store, s, src_bucket_info, src_attrs, &src_policy,
-                         src_bucket, src_object);
+    op_ret = read_obj_policy(store, s, src_bucket_info, src_attrs, &src_acl,
+                            src_policy, src_bucket, src_object);
     if (op_ret < 0) {
       return op_ret;
     }
 
     /* admin request overrides permission checks */
-    if (! s->auth.identity->is_admin_of(src_policy.get_owner().get_id()) &&
-        ! src_policy.verify_permission(*s->auth.identity, s->perm_mask,
-                                       RGW_PERM_READ)) {
-      return -EACCES;
+    if (!s->auth.identity->is_admin_of(src_acl.get_owner().get_id())) {
+      if (src_policy) {
+       auto e = src_policy->eval(s->env, *s->auth.identity,
+                                 src_object.instance.empty() ?
+                                 rgw::IAM::s3GetObject :
+                                 rgw::IAM::s3GetObjectVersion,
+                                 ARN(src_obj));
+       if (e == Effect::Deny) {
+         return -EACCES;
+       } else if (e == Effect::Pass &&
+                  !src_acl.verify_permission(*s->auth.identity, s->perm_mask,
+                                             RGW_PERM_READ)) { 
+         return -EACCES;
+       }
+      } else if (!src_acl.verify_permission(*s->auth.identity,
+                                              s->perm_mask,
+                                           RGW_PERM_READ)) {
+       return -EACCES;
+      }
     }
   }
 
@@ -4016,7 +4530,10 @@ int RGWCopyObj::init_common()
   dest_policy.encode(aclbl);
   emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
 
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    return op_ret;
+  }
   populate_with_generic_attrs(s, attrs);
 
   return 0;
@@ -4097,7 +4614,6 @@ void RGWCopyObj::execute()
                           (version_id.empty() ? NULL : &version_id),
                           &s->req_id, /* use req_id as tag */
                           &etag,
-                          &s->err,
                           copy_obj_progress_cb, (void *)this
     );
 }
@@ -4106,9 +4622,12 @@ int RGWGetACLs::verify_permission()
 {
   bool perm;
   if (!s->object.empty()) {
-    perm = verify_object_permission(s, RGW_PERM_READ_ACP);
+    perm = verify_object_permission(s,
+                                   s->object.instance.empty() ?
+                                   rgw::IAM::s3GetObjectAcl :
+                                   rgw::IAM::s3GetObjectVersionAcl);
   } else {
-    perm = verify_bucket_permission(s, RGW_PERM_READ_ACP);
+    perm = verify_bucket_permission(s, rgw::IAM::s3GetBucketAcl);
   }
   if (!perm)
     return -EACCES;
@@ -4124,8 +4643,10 @@ void RGWGetACLs::pre_exec()
 void RGWGetACLs::execute()
 {
   stringstream ss;
-  RGWAccessControlPolicy *acl = (!s->object.empty() ? s->object_acl : s->bucket_acl);
-  RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(acl);
+  RGWAccessControlPolicy* const acl = \
+    (!s->object.empty() ? s->object_acl.get() : s->bucket_acl.get());
+  RGWAccessControlPolicy_S3* const s3policy = \
+    static_cast<RGWAccessControlPolicy_S3*>(acl);
   s3policy->to_xml(ss);
   acls = ss.str();
 }
@@ -4136,9 +4657,12 @@ int RGWPutACLs::verify_permission()
 {
   bool perm;
   if (!s->object.empty()) {
-    perm = verify_object_permission(s, RGW_PERM_WRITE_ACP);
+    perm = verify_object_permission(s,
+                                   s->object.instance.empty() ?
+                                   rgw::IAM::s3PutObjectAcl :
+                                   rgw::IAM::s3PutObjectVersionAcl);
   } else {
-    perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP);
+    perm = verify_bucket_permission(s, rgw::IAM::s3PutBucketAcl);
   }
   if (!perm)
     return -EACCES;
@@ -4149,7 +4673,7 @@ int RGWPutACLs::verify_permission()
 int RGWGetLC::verify_permission()
 {
   bool perm;
-  perm = verify_bucket_permission(s, RGW_PERM_READ_ACP);
+  perm = verify_bucket_permission(s, rgw::IAM::s3GetLifecycleConfiguration);
   if (!perm)
     return -EACCES;
 
@@ -4159,7 +4683,7 @@ int RGWGetLC::verify_permission()
 int RGWPutLC::verify_permission()
 {
   bool perm;
-  perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP);
+  perm = verify_bucket_permission(s, rgw::IAM::s3PutLifecycleConfiguration);
   if (!perm)
     return -EACCES;
 
@@ -4169,7 +4693,7 @@ int RGWPutLC::verify_permission()
 int RGWDeleteLC::verify_permission()
 {
   bool perm;
-  perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP);
+  perm = verify_bucket_permission(s, rgw::IAM::s3PutLifecycleConfiguration);
   if (!perm)
     return -EACCES;
 
@@ -4215,13 +4739,23 @@ void RGWPutACLs::execute()
   }
 
 
-  RGWAccessControlPolicy *existing_policy = (s->object.empty() ? s->bucket_acl : s->object_acl);
+  RGWAccessControlPolicy* const existing_policy = \
+    (s->object.empty() ? s->bucket_acl.get() : s->object_acl.get());
 
   owner = existing_policy->get_owner();
 
   op_ret = get_params();
-  if (op_ret < 0)
+  if (op_ret < 0) {
+    if (op_ret == -ERANGE) {
+      ldout(s->cct, 4) << "The size of request xml data is larger than the max limitation, data size = "
+                       << s->length << dendl;
+      op_ret = -ERR_MALFORMED_XML;
+      s->err.message = "The XML you provided was larger than the maximum " +
+                       std::to_string(s->cct->_conf->rgw_max_put_param_size) +
+                       " bytes allowed.";
+    }
     return;
+  }
 
   ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
 
@@ -4251,6 +4785,27 @@ void RGWPutACLs::execute()
     return;
   }
 
+  const RGWAccessControlList& req_acl = policy->get_acl();
+  const multimap<string, ACLGrant>& req_grant_map = req_acl.get_grant_map();
+#define ACL_GRANTS_MAX_NUM      100
+  int max_num = s->cct->_conf->rgw_acl_grants_max_num;
+  if (max_num < 0) {
+    max_num = ACL_GRANTS_MAX_NUM;
+  }
+
+  int grants_num = req_grant_map.size();
+  if (grants_num > max_num) {
+    ldout(s->cct, 4) << "An acl can have up to "
+                     << max_num
+                     << " grants, request acl grants num: "
+                     << grants_num << dendl;
+    op_ret = -ERR_MALFORMED_ACL_ERROR;
+    s->err.message = "The request is rejected, because the acl grants number you requested is larger than the maximum "
+                     + std::to_string(max_num)
+                     + " grants allowed in an acl.";
+    return;
+  }
+
   // forward bucket acl requests to meta master zone
   if (s->object.empty() && !store->is_meta_master()) {
     bufferlist in_data;
@@ -4319,6 +4874,25 @@ void RGWPutLC::execute()
   RGWLCXMLParser_S3 parser(s->cct);
   RGWLifecycleConfiguration_S3 new_config(s->cct);
 
+  content_md5 = s->info.env->get("HTTP_CONTENT_MD5");
+  if (content_md5 == nullptr) {
+    op_ret = -ERR_INVALID_REQUEST;
+    s->err.message = "Missing required header for this request: Content-MD5";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    return;
+  }
+
+  std::string content_md5_bin;
+  try {
+    content_md5_bin = rgw::from_base64(boost::string_view(content_md5));
+  } catch (...) {
+    s->err.message = "Request header Content-MD5 contains character "
+                     "that is not base64 encoded.";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    op_ret = -ERR_BAD_DIGEST;
+    return;
+  }
+
   if (!parser.init()) {
     op_ret = -EINVAL;
     return;
@@ -4330,6 +4904,21 @@ void RGWPutLC::execute()
 
   ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
 
+  MD5 data_hash;
+  unsigned char data_hash_res[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  data_hash.Update(reinterpret_cast<const byte*>(data), len);
+  data_hash.Final(data_hash_res);
+
+  if (memcmp(data_hash_res, content_md5_bin.c_str(), CEPH_CRYPTO_MD5_DIGESTSIZE) != 0) {
+    op_ret = -ERR_BAD_DIGEST;
+    s->err.message = "The Content-MD5 you specified did not match what we received.";
+    ldout(s->cct, 5) << s->err.message
+                     << " Specified content md5: " << content_md5
+                     << ", calculated content md5: " << data_hash_res
+                     << dendl;
+    return;
+  }
+
   if (!parser.parse(data, len, 1)) {
     op_ret = -ERR_MALFORMED_XML;
     return;
@@ -4416,7 +5005,7 @@ void RGWDeleteLC::execute()
       }
     }
   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
-  string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+  string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;
   pair<string, int> entry(shard_id, lc_uninitial);
   string oid; 
   get_lc_oid(s, oid);
@@ -4448,11 +5037,7 @@ void RGWDeleteLC::execute()
 
 int RGWGetCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketCORS);
 }
 
 void RGWGetCORS::execute()
@@ -4470,11 +5055,7 @@ void RGWGetCORS::execute()
 
 int RGWPutCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketCORS);
 }
 
 void RGWPutCORS::execute()
@@ -4485,18 +5066,25 @@ void RGWPutCORS::execute()
   if (op_ret < 0)
     return;
 
-  map<string, bufferlist> attrs = s->bucket_attrs;
-  attrs[RGW_ATTR_CORS] = cors_bl;
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+  if (!store->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << __func__ << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      map<string, bufferlist> attrs = s->bucket_attrs;
+      attrs[RGW_ATTR_CORS] = cors_bl;
+      return rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+    });
 }
 
 int RGWDeleteCORS::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  // No separate delete permission
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketCORS);
 }
 
 void RGWDeleteCORS::execute()
@@ -4506,32 +5094,35 @@ void RGWDeleteCORS::execute()
     return;
 
   bufferlist bl;
-  rgw_raw_obj obj;
   if (!cors_exist) {
     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
     op_ret = -ENOENT;
     return;
   }
-  store->get_bucket_instance_obj(s->bucket, obj);
-  store->set_prefetch_data(s->obj_ctx, obj);
-  map<string, bufferlist> orig_attrs, attrs, rmattrs;
-  map<string, bufferlist>::iterator iter;
-
-  op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
-  if (op_ret < 0)
-    return;
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      rgw_raw_obj obj;
+      store->get_bucket_instance_obj(s->bucket, obj);
+      store->set_prefetch_data(s->obj_ctx, obj);
+      map<string, bufferlist> orig_attrs, attrs, rmattrs;
+      map<string, bufferlist>::iterator iter;
 
-  /* only remove meta attrs */
-  for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
-    const string& name = iter->first;
-    dout(10) << "DeleteCORS : attr: " << name << dendl;
-    if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
-      rmattrs[name] = iter->second;
-    } else if (attrs.find(name) == attrs.end()) {
-      attrs[name] = iter->second;
-    }
-  }
-  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
+      op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
+      if (op_ret < 0)
+       return op_ret;
+
+      /* only remove meta attrs */
+      for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
+       const string& name = iter->first;
+       dout(10) << "DeleteCORS : attr: " << name << dendl;
+       if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
+         rmattrs[name] = iter->second;
+       } else if (attrs.find(name) == attrs.end()) {
+         attrs[name] = iter->second;
+       }
+      }
+      return rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                 &s->bucket_info.objv_tracker);
+    });
 }
 
 void RGWOptionsCORS::get_response_params(string& hdrs, string& exp_hdrs, unsigned *max_age) {
@@ -4548,6 +5139,11 @@ int RGWOptionsCORS::validate_cors_request(RGWCORSConfiguration *cc) {
   if (!validate_cors_rule_method(rule, req_meth)) {
     return -ENOENT;
   }
+
+  if (!validate_cors_rule_header(rule, req_hdrs)) {
+    return -ENOENT;
+  }
+
   return 0;
 }
 
@@ -4589,7 +5185,7 @@ void RGWOptionsCORS::execute()
 
 int RGWGetRequestPayment::verify_permission()
 {
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3GetBucketRequestPayment);
 }
 
 void RGWGetRequestPayment::pre_exec()
@@ -4604,11 +5200,7 @@ void RGWGetRequestPayment::execute()
 
 int RGWSetRequestPayment::verify_permission()
 {
-  if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
-    return -EACCES;
-  }
-
-  return 0;
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketRequestPayment);
 }
 
 void RGWSetRequestPayment::pre_exec()
@@ -4635,8 +5227,20 @@ void RGWSetRequestPayment::execute()
 
 int RGWInitMultipart::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
+  }
 
   return 0;
 }
@@ -4668,7 +5272,10 @@ void RGWInitMultipart::execute()
   if (op_ret != 0)
     return;
 
-  rgw_get_request_metadata(s->cct, s->info, attrs);
+  op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+  if (op_ret < 0) {
+    return;
+  }
 
   do {
     char buf[33];
@@ -4740,8 +5347,20 @@ static int get_multipart_info(RGWRados *store, struct req_state *s,
 
 int RGWCompleteMultipart::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
+  }
 
   return 0;
 }
@@ -4831,6 +5450,27 @@ void RGWCompleteMultipart::execute()
   meta_obj.set_in_extra_data(true);
   meta_obj.index_hash_source = s->object.name;
 
+  /*take a cls lock on meta_obj to prevent racing completions (or retries)
+    from deleting the parts*/
+  rgw_pool meta_pool;
+  rgw_raw_obj raw_obj;
+  int max_lock_secs_mp =
+    s->cct->_conf->get_val<int64_t>("rgw_mp_lock_max_time");
+  utime_t dur(max_lock_secs_mp, 0);
+
+  store->obj_to_raw((s->bucket_info).placement_rule, meta_obj, &raw_obj);
+  store->get_obj_data_pool((s->bucket_info).placement_rule,
+                          meta_obj,&meta_pool);
+  store->open_pool_ctx(meta_pool, serializer.ioctx);
+
+  op_ret = serializer.try_lock(raw_obj.oid, dur);
+  if (op_ret < 0) {
+    dout(0) << "RGWCompleteMultipart::execute() failed to acquire lock " << dendl;
+    op_ret = -ERR_INTERNAL_ERROR;
+    s->err.message = "This multipart completion is already in progress";
+    return;
+  }
+
   op_ret = get_obj_attrs(store, s, meta_obj, attrs);
 
   if (op_ret < 0) {
@@ -4907,7 +5547,7 @@ void RGWCompleteMultipart::execute()
           op_ret = -ERR_INVALID_PART;
           return;
         }
-        int new_ofs; // offset in compression data for new part
+        int64_t new_ofs; // offset in compression data for new part
         if (cs_info.blocks.size() > 0)
           new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
         else
@@ -4972,6 +5612,8 @@ void RGWCompleteMultipart::execute()
   obj_op.meta.ptag = &s->req_id; /* use req_id as operation tag */
   obj_op.meta.owner = s->owner.get_id();
   obj_op.meta.flags = PUT_OBJ_CREATE;
+  obj_op.meta.modify_tail = true;
+  obj_op.meta.completeMultipart = true;
   op_ret = obj_op.write_meta(ofs, accounted_size, attrs);
   if (op_ret < 0)
     return;
@@ -4979,15 +5621,59 @@ void RGWCompleteMultipart::execute()
   // remove the upload obj
   int r = store->delete_obj(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                            s->bucket_info, meta_obj, 0);
-  if (r < 0) {
-    ldout(store->ctx(), 0) << "WARNING: failed to remove object " << meta_obj << dendl;
+  if (r >= 0)  {
+    /* serializer's exclusive lock is released */
+    serializer.clear_locked();
+  } else {
+      ldout(store->ctx(), 0) << "WARNING: failed to remove object "
+                            << meta_obj << dendl;
+  }
+}
+
+int RGWCompleteMultipart::MPSerializer::try_lock(
+  const std::string& _oid,
+  utime_t dur)
+{
+  oid = _oid;
+  op.assert_exists();
+  lock.set_duration(dur);
+  lock.lock_exclusive(&op);
+  int ret = ioctx.operate(oid, &op);
+  if (! ret) {
+    locked = true;
+  }
+  return ret;
+}
+
+void RGWCompleteMultipart::complete()
+{
+  /* release exclusive lock iff not already */
+  if (unlikely(serializer.locked)) {
+    int r = serializer.unlock();
+    if (r < 0) {
+      ldout(store->ctx(), 0) << "WARNING: failed to unlock "
+                            << serializer.oid << dendl;
+    }
   }
+  send_response();
 }
 
 int RGWAbortMultipart::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3AbortMultipartUpload,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
+  }
 
   return 0;
 }
@@ -5023,7 +5709,7 @@ void RGWAbortMultipart::execute()
 
 int RGWListMultipart::verify_permission()
 {
-  if (!verify_object_permission(s, RGW_PERM_READ))
+  if (!verify_object_permission(s, rgw::IAM::s3ListMultipartUploadParts))
     return -EACCES;
 
   return 0;
@@ -5057,7 +5743,8 @@ void RGWListMultipart::execute()
 
 int RGWListBucketMultiparts::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_READ))
+  if (!verify_bucket_permission(s,
+                               rgw::IAM::s3ListBucketMultipartUploads))
     return -EACCES;
 
   return 0;
@@ -5091,17 +5778,12 @@ void RGWListBucketMultiparts::execute()
   }
   marker_meta = marker.get_meta();
 
-  RGWRados::Bucket target(store, s->bucket_info);
-  RGWRados::Bucket::List list_op(&target);
-
-  list_op.params.prefix = prefix;
-  list_op.params.delim = delimiter;
-  list_op.params.marker = marker_meta;
-  list_op.params.ns = mp_ns;
-  list_op.params.filter = &mp_filter;
+  op_ret = list_bucket_multiparts(store, s->bucket_info, prefix, marker_meta, delimiter,
+                                  max_uploads, &objs, &common_prefixes, &is_truncated);
+  if (op_ret < 0) {
+    return;
+  }
 
-  op_ret = list_op.list_objects(max_uploads, &objs, &common_prefixes,
-                               &is_truncated);
   if (!objs.empty()) {
     vector<rgw_bucket_dir_entry>::iterator iter;
     RGWMultipartUploadEntry entry;
@@ -5129,7 +5811,8 @@ void RGWGetHealthCheck::execute()
 
 int RGWDeleteMultiObj::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  acl_allowed = verify_bucket_permission_no_policy(s, RGW_PERM_WRITE);
+  if (!acl_allowed && !s->iam_policy)
     return -EACCES;
 
   return 0;
@@ -5186,6 +5869,19 @@ void RGWDeleteMultiObj::execute()
         iter != multi_delete->objects.end() && num_processed < max_to_delete;
         ++iter, num_processed++) {
     rgw_obj obj(bucket, *iter);
+    if (s->iam_policy) {
+      auto e = s->iam_policy->eval(s->env,
+                                  *s->auth.identity,
+                                  iter->instance.empty() ?
+                                  rgw::IAM::s3DeleteObject :
+                                  rgw::IAM::s3DeleteObjectVersion,
+                                  obj);
+      if ((e == Effect::Deny) ||
+         (e == Effect::Pass && !acl_allowed)) {
+       send_partial_response(*iter, false, "", -EACCES);
+       continue;
+      }
+    }
 
     obj_ctx->obj.set_atomic(obj);
 
@@ -5232,11 +5928,14 @@ bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
     return false;
   }
 
+  auto policy = get_iam_policy_from_attr(s->cct, store, battrs, binfo.bucket.tenant);
+
   bucket_owner = bacl.get_owner();
 
   /* We can use global user_acl because each BulkDelete request is allowed
    * to work on entities from a single account only. */
-  return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE);
+  return verify_bucket_permission(s, binfo.bucket, s->user_acl.get(),
+                                 &bacl, policy, rgw::IAM::s3DeleteBucket);
 }
 
 bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
@@ -5291,7 +5990,7 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
       goto delop_fail;
     }
 
-    if (!store->get_zonegroup().is_master) {
+    if (!store->is_meta_master()) {
       bufferlist in_data;
       ret = forward_request_to_master(s, &ot.read_version, store, in_data,
                                       nullptr);
@@ -5449,7 +6148,7 @@ RGWBulkUploadOp::parse_path(const boost::string_ref& path)
     }
   }
 
-  return boost::none;
+  return none;
 }
 
 std::pair<std::string, std::string>
@@ -5670,6 +6369,7 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
 
 
 bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
+                                                   const rgw_obj& obj,
                                                     std::map<std::string, ceph::bufferlist>& battrs,
                                                     ACLOwner& bucket_owner /* out */)
 {
@@ -5681,8 +6381,21 @@ bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
     return false;
   }
 
+  auto policy = get_iam_policy_from_attr(s->cct, store, battrs, binfo.bucket.tenant);
+
   bucket_owner = bacl.get_owner();
-  return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE);
+  if (policy) {
+    auto e = policy->eval(s->env, *s->auth.identity,
+                         rgw::IAM::s3PutObject, obj);
+    if (e == Effect::Allow) {
+      return true;
+    } else if (e == Effect::Deny) {
+      return false;
+    }
+  }
+    
+  return verify_bucket_permission_no_policy(s, s->user_acl.get(),
+                                           &bacl, RGW_PERM_WRITE);
 }
 
 int RGWBulkUploadOp::handle_file(const boost::string_ref path,
@@ -5718,7 +6431,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
-  if (! handle_file_verify_permission(binfo, battrs, bowner)) {
+  if (! handle_file_verify_permission(binfo,
+                                     rgw_obj(binfo.bucket, object),
+                                     battrs, bowner)) {
     ldout(s->cct, 20) << "bulk upload: object creation unauthorized" << dendl;
     op_ret = -EACCES;
     return op_ret;
@@ -5730,6 +6445,11 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
+  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
   RGWPutObjProcessor_Atomic processor(obj_ctx,
                                       binfo,
                                       binfo.bucket,
@@ -5803,6 +6523,11 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
+  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
   unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
   hash.Final(m);
@@ -5968,11 +6693,13 @@ ssize_t RGWBulkUploadOp::AlignedStreamGetter::get_exactly(const size_t want,
 
 int RGWSetAttrs::verify_permission()
 {
+  // This looks to be part of the RGW-NFS machinery and has no S3 or
+  // Swift equivalent.
   bool perm;
   if (!s->object.empty()) {
-    perm = verify_object_permission(s, RGW_PERM_WRITE);
+    perm = verify_object_permission_no_policy(s, RGW_PERM_WRITE);
   } else {
-    perm = verify_bucket_permission(s, RGW_PERM_WRITE);
+    perm = verify_bucket_permission_no_policy(s, RGW_PERM_WRITE);
   }
   if (!perm)
     return -EACCES;
@@ -5993,9 +6720,8 @@ void RGWSetAttrs::execute()
 
   rgw_obj obj(s->bucket, s->object);
 
-  store->set_atomic(s->obj_ctx, obj);
-
   if (!s->object.empty()) {
+    store->set_atomic(s->obj_ctx, obj);
     op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, nullptr);
   } else {
     for (auto& iter : attrs) {
@@ -6031,6 +6757,77 @@ void RGWGetObjLayout::execute()
 }
 
 
+int RGWConfigBucketMetaSearch::verify_permission()
+{
+  if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWConfigBucketMetaSearch::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWConfigBucketMetaSearch::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "NOTICE: get_params() returned ret=" << op_ret << dendl;
+    return;
+  }
+
+  s->bucket_info.mdsearch_config = mdsearch_config;
+
+  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
+    return;
+  }
+}
+
+int RGWGetBucketMetaSearch::verify_permission()
+{
+  if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketMetaSearch::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+int RGWDelBucketMetaSearch::verify_permission()
+{
+  if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWDelBucketMetaSearch::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWDelBucketMetaSearch::execute()
+{
+  s->bucket_info.mdsearch_config.clear();
+
+  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
+    return;
+  }
+}
+
+
 RGWHandler::~RGWHandler()
 {
 }
@@ -6048,6 +6845,7 @@ int RGWHandler::init(RGWRados *_store,
 int RGWHandler::do_init_permissions()
 {
   int ret = rgw_build_bucket_policies(store, s);
+  s->env = rgw_build_iam_environment(store, s);
 
   if (ret < 0) {
     ldout(s->cct, 10) << "read_permissions on " << s->bucket << " ret=" << ret << dendl;
@@ -6085,3 +6883,142 @@ int RGWHandler::error_handler(int err_no, string *error_content) {
   // This is the do-nothing error handler
   return err_no;
 }
+
+
+void RGWPutBucketPolicy::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+int RGWPutBucketPolicy::verify_permission()
+{
+  if (!verify_bucket_permission(s, rgw::IAM::s3PutBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+int RGWPutBucketPolicy::get_params()
+{
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  // At some point when I have more time I want to make a version of
+  // rgw_rest_read_all_input that doesn't use malloc.
+  op_ret = rgw_rest_read_all_input(s, &data, &len, max_size, false);
+  // And throws exceptions.
+  return op_ret;
+}
+
+void RGWPutBucketPolicy::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  bufferlist in_data = bufferlist::static_from_mem(data, len);
+
+  if (!store->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  try {
+    const Policy p(s->cct, s->bucket_tenant, in_data);
+    op_ret = retry_raced_bucket_write(store, s, [&p, this] {
+       auto attrs = s->bucket_attrs;
+       attrs[RGW_ATTR_IAM_POLICY].clear();
+       attrs[RGW_ATTR_IAM_POLICY].append(p.text);
+       op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                     &s->bucket_info.objv_tracker);
+       return op_ret;
+      });
+  } catch (rgw::IAM::PolicyParseException& e) {
+    ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
+    op_ret = -EINVAL;
+  }
+}
+
+void RGWGetBucketPolicy::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/json");
+  dump_body(s, policy);
+}
+
+int RGWGetBucketPolicy::verify_permission()
+{
+  if (!verify_bucket_permission(s, rgw::IAM::s3GetBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketPolicy::execute()
+{
+  auto attrs = s->bucket_attrs;
+  map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_IAM_POLICY);
+  if (aiter == attrs.end()) {
+    ldout(s->cct, 0) << __func__ << " can't find bucket IAM POLICY attr" 
+                     << " bucket_name = " << s->bucket_name << dendl;
+    op_ret = -ERR_NO_SUCH_BUCKET_POLICY;
+    s->err.message = "The bucket policy does not exist";
+    return;
+  } else {
+    policy = attrs[RGW_ATTR_IAM_POLICY];
+
+    if (policy.length() == 0) {
+      ldout(s->cct, 10) << "The bucket policy does not exist, bucket: " << s->bucket_name << dendl;
+      op_ret = -ERR_NO_SUCH_BUCKET_POLICY;
+      s->err.message = "The bucket policy does not exist";
+      return;
+    }
+  } 
+}
+
+void RGWDeleteBucketPolicy::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+int RGWDeleteBucketPolicy::verify_permission()
+{
+  if (!verify_bucket_permission(s, rgw::IAM::s3DeleteBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWDeleteBucketPolicy::execute()
+{
+  op_ret = retry_raced_bucket_write(store, s, [this] {
+      auto attrs = s->bucket_attrs;
+      attrs.erase(RGW_ATTR_IAM_POLICY);
+      op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                   &s->bucket_info.objv_tracker);
+      return op_ret;
+    });
+}
+
+void RGWGetClusterStat::execute()
+{
+  op_ret = this->store->get_rados_handle()->cluster_stat(stats_op);
+}
+
+