]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_op.cc
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_op.cc
index 844b1dad5b020fdd3aedea13a20b18579f9caaca..c62804537790a3ea4dea7fb617dde6d9f400ec9b 100644 (file)
@@ -3,15 +3,20 @@
 
 #include <errno.h>
 #include <stdlib.h>
+#include <system_error>
 #include <unistd.h>
 
 #include <sstream>
 
 #include <boost/algorithm/string/predicate.hpp>
+#include <boost/bind.hpp>
 #include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
+#include <boost/utility/string_view.hpp>
 
 #include "common/Clock.h"
 #include "common/armor.h"
+#include "common/errno.h"
 #include "common/mime.h"
 #include "common/utf8.h"
 #include "common/ceph_json.h"
 using namespace std;
 using namespace librados;
 using ceph::crypto::MD5;
+using boost::optional;
+using boost::none;
 
+using rgw::IAM::ARN;
+using rgw::IAM::Effect;
+using rgw::IAM::Policy;
+
+using rgw::IAM::Policy;
 
 static string mp_ns = RGW_OBJ_NS_MULTIPART;
 static string shadow_ns = RGW_OBJ_NS_SHADOW;
@@ -140,9 +152,9 @@ static int decode_policy(CephContext *cct,
 
 
 static int get_user_policy_from_attr(CephContext * const cct,
-                                     RGWRados * const store,
-                                     map<string, bufferlist>& attrs,
-                                     RGWAccessControlPolicy& policy    /* out */)
+                                    RGWRados * const store,
+                                    map<string, bufferlist>& attrs,
+                                    RGWAccessControlPolicy& policy    /* out */)
 {
   auto aiter = attrs.find(RGW_ATTR_ACL);
   if (aiter != attrs.end()) {
@@ -158,11 +170,11 @@ static int get_user_policy_from_attr(CephContext * const cct,
 }
 
 static int get_bucket_instance_policy_from_attr(CephContext *cct,
-                                       RGWRados *store,
-                                       RGWBucketInfo& bucket_info,
-                                       map<string, bufferlist>& bucket_attrs,
-                                       RGWAccessControlPolicy *policy,
-                                       rgw_raw_obj& obj)
+                                               RGWRados *store,
+                                               RGWBucketInfo& bucket_info,
+                                               map<string, bufferlist>& bucket_attrs,
+                                               RGWAccessControlPolicy *policy,
+                                               rgw_raw_obj& obj)
 {
   map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_ACL);
 
@@ -184,12 +196,12 @@ static int get_bucket_instance_policy_from_attr(CephContext *cct,
 }
 
 static int get_obj_policy_from_attr(CephContext *cct,
-                                    RGWRados *store,
-                                    RGWObjectCtx& obj_ctx,
-                                    RGWBucketInfo& bucket_info,
-                                    map<string, bufferlist>& bucket_attrs,
-                                    RGWAccessControlPolicy *policy,
-                                    rgw_obj& obj)
+                                   RGWRados *store,
+                                   RGWObjectCtx& obj_ctx,
+                                   RGWBucketInfo& bucket_info,
+                                   map<string, bufferlist>& bucket_attrs,
+                                   RGWAccessControlPolicy *policy,
+                                   rgw_obj& obj)
 {
   bufferlist bl;
   int ret = 0;
@@ -224,15 +236,27 @@ static int get_obj_policy_from_attr(CephContext *cct,
  * Returns: 0 on success, -ERR# otherwise.
  */
 static int get_bucket_policy_from_attr(CephContext *cct,
-                                RGWRados *store,
-                                RGWBucketInfo& bucket_info,
-                                map<string, bufferlist>& bucket_attrs,
-                                RGWAccessControlPolicy *policy)
+                                      RGWRados *store,
+                                      RGWBucketInfo& bucket_info,
+                                      map<string, bufferlist>& bucket_attrs,
+                                      RGWAccessControlPolicy *policy)
 {
   rgw_raw_obj instance_obj;
   store->get_bucket_instance_obj(bucket_info.bucket, instance_obj);
   return get_bucket_instance_policy_from_attr(cct, store, bucket_info, bucket_attrs,
-                                              policy, instance_obj);
+                                             policy, instance_obj);
+}
+
+static optional<Policy> get_iam_policy_from_attr(CephContext* cct,
+                                                RGWRados* store,
+                                                map<string, bufferlist>& attrs,
+                                                const string& tenant) {
+  auto i = attrs.find(RGW_ATTR_IAM_POLICY);
+  if (i != attrs.end()) {
+    return Policy(cct, tenant, i->second);
+  } else {
+    return none;
+  }
 }
 
 static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map<string, bufferlist>& attrs)
@@ -241,7 +265,6 @@ static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
-  read_op.params.perr = &s->err;
 
   return read_op.prepare();
 }
@@ -253,7 +276,6 @@ static int modify_obj_attr(RGWRados *store, struct req_state *s, rgw_obj& obj, c
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
-  read_op.params.perr = &s->err;
   
   int r = read_op.prepare();
   if (r < 0) {
@@ -305,7 +327,8 @@ static int read_obj_policy(RGWRados *store,
                            struct req_state *s,
                            RGWBucketInfo& bucket_info,
                            map<string, bufferlist>& bucket_attrs,
-                           RGWAccessControlPolicy *policy,
+                           RGWAccessControlPolicy* acl,
+                          optional<Policy>& policy,
                            rgw_bucket& bucket,
                            rgw_obj_key& object)
 {
@@ -327,9 +350,11 @@ static int read_obj_policy(RGWRados *store,
   } else {
     obj = rgw_obj(bucket, object);
   }
+  policy = get_iam_policy_from_attr(s->cct, store, bucket_attrs, bucket.tenant);
+
   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
   int ret = get_obj_policy_from_attr(s->cct, store, *obj_ctx,
-                                     bucket_info, bucket_attrs, policy, obj);
+                                     bucket_info, bucket_attrs, acl, obj);
   if (ret == -ENOENT) {
     /* object does not exist checking the bucket's ACL to make sure
        that we send a proper error code */
@@ -466,7 +491,7 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
       /* we now need to make sure that the operation actually requires copy source, that is
        * it's a copy operation
        */
-      if (store->get_zonegroup().is_master && s->system_request) {
+      if (store->get_zonegroup().is_master_zonegroup() && s->system_request) {
         /*If this is the master, don't redirect*/
       } else if (!s->local_source ||
           (s->op != OP_PUT && s->op != OP_COPY) ||
@@ -505,6 +530,16 @@ int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
     }
   }
 
+  try {
+    s->iam_policy = get_iam_policy_from_attr(s->cct, store, s->bucket_attrs,
+                                            s->bucket_tenant);
+  } catch (const std::exception& e) {
+    // Really this is a can't happen condition. We parse the policy
+    // when it's given to us, so perhaps we should abort or otherwise
+    // raise bloody murder.
+    lderr(s->cct) << "Error reading IAM Policy: " << e.what() << dendl;
+    ret = -EACCES;
+  }
 
   return ret;
 }
@@ -532,13 +567,73 @@ int rgw_build_object_policies(RGWRados *store, struct req_state *s,
     if (prefetch_data) {
       store->set_prefetch_data(s->obj_ctx, obj);
     }
-    ret = read_obj_policy(store, s, s->bucket_info, s->bucket_attrs, s->object_acl, s->bucket, s->object);
+    ret = read_obj_policy(store, s, s->bucket_info, s->bucket_attrs, s->object_acl, s->iam_policy, s->bucket, s->object);
   }
 
   return ret;
 }
 
-static void rgw_bucket_object_pre_exec(struct req_state *s)
+rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
+                                               struct req_state* s)
+{
+  rgw::IAM::Environment e;
+  const auto& m = s->info.env->get_map();
+  auto t = ceph::real_clock::now();
+  e.emplace(std::piecewise_construct,
+           std::forward_as_tuple("aws:CurrentTime"),
+           std::forward_as_tuple(std::to_string(
+                                   ceph::real_clock::to_time_t(t))));
+  e.emplace(std::piecewise_construct,
+           std::forward_as_tuple("aws:EpochTime"),
+           std::forward_as_tuple(ceph::to_iso_8601(t)));
+  // TODO: This is fine for now, but once we have STS we'll need to
+  // look and see. Also this won't work with the IdentityApplier
+  // model, since we need to know the actual credential.
+  e.emplace(std::piecewise_construct,
+           std::forward_as_tuple("aws:PrincipalType"),
+           std::forward_as_tuple("User"));
+
+  auto i = m.find("HTTP_REFERER");
+  if (i != m.end()) {
+    e.emplace(std::piecewise_construct,
+             std::forward_as_tuple("aws:Referer"),
+             std::forward_as_tuple(i->second));
+  }
+
+  // These seem to be the semantics, judging from rest_rgw_s3.cc
+  i = m.find("SERVER_PORT_SECURE");
+  if (i != m.end()) {
+    e.emplace(std::piecewise_construct,
+             std::forward_as_tuple("aws:SecureTransport"),
+             std::forward_as_tuple("true"));
+  }
+
+  i = m.find("HTTP_HOST");
+  if (i != m.end()) {
+    e.emplace(std::piecewise_construct,
+             std::forward_as_tuple("aws:SourceIp"),
+             std::forward_as_tuple(i->second));
+  }
+
+  i = m.find("HTTP_USER_AGENT"); {
+  if (i != m.end())
+    e.emplace(std::piecewise_construct,
+             std::forward_as_tuple("aws:UserAgent"),
+             std::forward_as_tuple(i->second));
+  }
+
+  if (s->user) {
+    // What to do about aws::userid? One can have multiple access
+    // keys so that isn't really suitable. Do we have a durable
+    // identifier that can persist through name changes?
+    e.emplace(std::piecewise_construct,
+             std::forward_as_tuple("aws:username"),
+             std::forward_as_tuple(s->user->user_id.id));
+  }
+  return e;
+}
+
+void rgw_bucket_object_pre_exec(struct req_state *s)
 {
   if (s->expect_cont)
     dump_continue(s);
@@ -554,7 +649,21 @@ int RGWGetObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, obj);
   }
 
-  if (!verify_object_permission(s, RGW_PERM_READ)) {
+  if (torrent.get_flag()) {
+    if (obj.key.instance.empty()) {
+      action = rgw::IAM::s3GetObjectTorrent;
+    } else {
+      action = rgw::IAM::s3GetObjectVersionTorrent;
+    }
+  } else {
+    if (obj.key.instance.empty()) {
+      action = rgw::IAM::s3GetObject;
+    } else {
+      action = rgw::IAM::s3GetObjectVersion;
+    }
+  }
+
+  if (!verify_object_permission(s, action)) {
     return -EACCES;
   }
 
@@ -583,23 +692,18 @@ int RGWOp::verify_op_mask()
 
 int RGWOp::do_aws4_auth_completion()
 {
-  int ret;
-
-  if (s->aws4_auth_needs_complete) {
-    /* complete */
-    ret = RGW_Auth_S3::authorize_aws4_auth_complete(store, s);
-    s->aws4_auth_needs_complete = false;
-    if (ret) {
-      return ret;
-    }
-    /* verify signature */
-    if (s->aws4_auth->signature != s->aws4_auth->new_signature) {
-      ret = -ERR_SIGNATURE_NO_MATCH;
-      ldout(s->cct, 20) << "delayed aws4 auth failed" << dendl;
-      return ret;
+  ldout(s->cct, 5) << "NOTICE: call to do_aws4_auth_completion"  << dendl;
+  if (s->auth.completer) {
+    if (!s->auth.completer->complete()) {
+      return -ERR_AMZ_CONTENT_SHA256_MISMATCH;
+    } else {
+      dout(10) << "v4 auth ok -- do_aws4_auth_completion" << dendl;
     }
-    /* authorization ok */
-    dout(10) << "v4 auth ok" << dendl;
+
+    /* TODO(rzarzynski): yes, we're really called twice on PUTs. Only first
+     * call passes, so we disable second one. This is old behaviour, sorry!
+     * Plan for tomorrow: seek and destroy. */
+    s->auth.completer = nullptr;
   }
 
   return 0;
@@ -794,7 +898,8 @@ bool RGWOp::generate_cors_headers(string& origin, string& method, string& header
 
 int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
                                        const rgw_bucket_dir_entry& ent,
-                                       RGWAccessControlPolicy * const bucket_policy,
+                                       RGWAccessControlPolicy * const bucket_acl,
+                                       const optional<Policy>& bucket_policy,
                                        const off_t start_ofs,
                                        const off_t end_ofs)
 {
@@ -825,7 +930,6 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
   read_op.conds.if_match = ent.meta.etag.c_str();
   read_op.params.attrs = &attrs;
   read_op.params.obj_size = &obj_size;
-  read_op.params.perr = &s->err;
 
   op_ret = read_op.prepare();
   if (op_ret < 0)
@@ -870,8 +974,8 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
     ldout(s->cct, 2) << "overriding permissions due to system operation" << dendl;
   } else if (s->auth.identity->is_admin_of(s->user->user_id)) {
     ldout(s->cct, 2) << "overriding permissions due to admin operation" << dendl;
-  } else if (!verify_object_permission(s, s->user_acl.get(), bucket_policy,
-                                       &obj_policy, RGW_PERM_READ)) {
+  } else if (!verify_object_permission(s, part, s->user_acl.get(), bucket_acl,
+                                      &obj_policy, bucket_policy, action)) {
     return -EPERM;
   }
 
@@ -893,13 +997,15 @@ static int iterate_user_manifest_parts(CephContext * const cct,
                                        const off_t end,
                                        RGWBucketInfo *pbucket_info,
                                        const string& obj_prefix,
-                                       RGWAccessControlPolicy * const bucket_policy,
+                                       RGWAccessControlPolicy * const bucket_acl,
+                                       const optional<Policy>& bucket_policy,
                                        uint64_t * const ptotal_len,
                                        uint64_t * const pobj_size,
                                        string * const pobj_sum,
                                        int (*cb)(rgw_bucket& bucket,
                                                  const rgw_bucket_dir_entry& ent,
-                                                 RGWAccessControlPolicy * const bucket_policy,
+                                                 RGWAccessControlPolicy * const bucket_acl,
+                                                 const optional<Policy>& bucket_policy,
                                                  off_t start_ofs,
                                                  off_t end_ofs,
                                                  void *param),
@@ -932,7 +1038,7 @@ static int iterate_user_manifest_parts(CephContext * const cct,
       uint64_t cur_total_len = obj_ofs;
       uint64_t start_ofs = 0, end_ofs = ent.meta.size;
 
-      if (!found_start && cur_total_len + ent.meta.size > (uint64_t)ofs) {
+      if ((ptotal_len || cb) && !found_start && cur_total_len + ent.meta.size > (uint64_t)ofs) {
        start_ofs = ofs - obj_ofs;
        found_start = true;
       }
@@ -943,7 +1049,7 @@ static int iterate_user_manifest_parts(CephContext * const cct,
                         ent.meta.etag.length());
       }
 
-      if (!found_end && obj_ofs > (uint64_t)end) {
+      if ((ptotal_len || cb) && !found_end && obj_ofs > (uint64_t)end) {
        end_ofs = end - cur_total_len + 1;
        found_end = true;
       }
@@ -955,7 +1061,7 @@ static int iterate_user_manifest_parts(CephContext * const cct,
         len_count += end_ofs - start_ofs;
 
         if (cb) {
-          r = cb(bucket, ent, bucket_policy, start_ofs, end_ofs, cb_param);
+          r = cb(bucket, ent, bucket_acl, bucket_policy, start_ofs, end_ofs, cb_param);
           if (r < 0) {
             return r;
           }
@@ -981,13 +1087,12 @@ static int iterate_user_manifest_parts(CephContext * const cct,
 }
 
 struct rgw_slo_part {
-  RGWAccessControlPolicy *bucket_policy;
+  RGWAccessControlPolicy *bucket_acl = nullptr;
+  Policy* bucket_policy = nullptr;
   rgw_bucket bucket;
   string obj_name;
-  uint64_t size;
+  uint64_t size = 0;
   string etag;
-
-  rgw_slo_part() : bucket_policy(NULL), size(0) {}
 };
 
 static int iterate_slo_parts(CephContext *cct,
@@ -997,7 +1102,8 @@ static int iterate_slo_parts(CephContext *cct,
                              map<uint64_t, rgw_slo_part>& slo_parts,
                              int (*cb)(rgw_bucket& bucket,
                                        const rgw_bucket_dir_entry& ent,
-                                       RGWAccessControlPolicy *bucket_policy,
+                                       RGWAccessControlPolicy *bucket_acl,
+                                       const optional<Policy>& bucket_policy,
                                        off_t start_ofs,
                                        off_t end_ofs,
                                        void *param),
@@ -1046,8 +1152,12 @@ static int iterate_slo_parts(CephContext *cct,
 
     if (found_start) {
       if (cb) {
-        int r = cb(part.bucket, ent, part.bucket_policy, start_ofs, end_ofs, cb_param);
-        if (r < 0)
+       // SLO is a Swift thing, and Swift has no knowledge of S3 Policies.
+        int r = cb(part.bucket, ent, part.bucket_acl,
+                  (part.bucket_policy ?
+                   optional<Policy>(*part.bucket_policy) : none),
+                  start_ofs, end_ofs, cb_param);
+       if (r < 0)
           return r;
       }
     }
@@ -1060,36 +1170,36 @@ static int iterate_slo_parts(CephContext *cct,
 
 static int get_obj_user_manifest_iterate_cb(rgw_bucket& bucket,
                                             const rgw_bucket_dir_entry& ent,
-                                            RGWAccessControlPolicy * const bucket_policy,
+                                            RGWAccessControlPolicy * const bucket_acl,
+                                            const optional<Policy>& bucket_policy,
                                             const off_t start_ofs,
                                             const off_t end_ofs,
                                             void * const param)
 {
   RGWGetObj *op = static_cast<RGWGetObj *>(param);
-  return op->read_user_manifest_part(bucket, ent, bucket_policy, start_ofs, end_ofs);
+  return op->read_user_manifest_part(bucket, ent, bucket_acl, bucket_policy, start_ofs, end_ofs);
 }
 
 int RGWGetObj::handle_user_manifest(const char *prefix)
 {
-  ldout(s->cct, 2) << "RGWGetObj::handle_user_manifest() prefix=" << prefix << dendl;
+  const boost::string_view prefix_view(prefix);
+  ldout(s->cct, 2) << "RGWGetObj::handle_user_manifest() prefix="
+                   << prefix_view << dendl;
 
-  string prefix_str = prefix;
-  size_t pos = prefix_str.find('/');
-  if (pos == string::npos)
+  const size_t pos = prefix_view.find('/');
+  if (pos == string::npos) {
     return -EINVAL;
+  }
 
-  string bucket_name_raw, bucket_name;
-  bucket_name_raw = prefix_str.substr(0, pos);
-  url_decode(bucket_name_raw, bucket_name);
-
-  string obj_prefix_raw, obj_prefix;
-  obj_prefix_raw = prefix_str.substr(pos + 1);
-  url_decode(obj_prefix_raw, obj_prefix);
+  const std::string bucket_name = url_decode(prefix_view.substr(0, pos));
+  const std::string obj_prefix = url_decode(prefix_view.substr(pos + 1));
 
   rgw_bucket bucket;
 
-  RGWAccessControlPolicy _bucket_policy(s->cct);
-  RGWAccessControlPolicy *bucket_policy;
+  RGWAccessControlPolicy _bucket_acl(s->cct);
+  RGWAccessControlPolicy *bucket_acl;
+  optional<Policy> _bucket_policy;
+  optional<Policy>* bucket_policy;
   RGWBucketInfo bucket_info;
   RGWBucketInfo *pbucket_info;
 
@@ -1106,16 +1216,20 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
     }
     bucket = bucket_info.bucket;
     pbucket_info = &bucket_info;
-    bucket_policy = &_bucket_policy;
-    r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_policy, bucket);
+    bucket_acl = &_bucket_acl;
+    r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_acl, bucket);
     if (r < 0) {
       ldout(s->cct, 0) << "failed to read bucket policy" << dendl;
       return r;
     }
+    _bucket_policy = get_iam_policy_from_attr(s->cct, store, bucket_attrs,
+                                             bucket_info.bucket.tenant);
+    bucket_policy = &_bucket_policy;
   } else {
     bucket = s->bucket;
     pbucket_info = &s->bucket_info;
-    bucket_policy = s->bucket_acl;
+    bucket_acl = s->bucket_acl;
+    bucket_policy = &s->iam_policy;
   }
 
   /* dry run to find out:
@@ -1123,13 +1237,26 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
    * - overall DLO's content size,
    * - md5 sum of overall DLO's content (for etag of Swift API). */
   int r = iterate_user_manifest_parts(s->cct, store, ofs, end,
-        pbucket_info, obj_prefix, bucket_policy,
-        &total_len, &s->obj_size, &lo_etag,
+        pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
+        nullptr, &s->obj_size, &lo_etag,
         nullptr /* cb */, nullptr /* cb arg */);
   if (r < 0) {
     return r;
   }
 
+  r = RGWRados::Object::Read::range_to_ofs(s->obj_size, ofs, end);
+  if (r < 0) {
+    return r;
+  }
+
+  r = iterate_user_manifest_parts(s->cct, store, ofs, end,
+        pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
+        &total_len, nullptr, nullptr,
+        nullptr, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
   if (!get_data) {
     bufferlist bl;
     send_response_data(bl, 0, 0);
@@ -1137,7 +1264,7 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
   }
 
   r = iterate_user_manifest_parts(s->cct, store, ofs, end,
-        pbucket_info, obj_prefix, bucket_policy,
+        pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
         nullptr, nullptr, nullptr,
         get_obj_user_manifest_iterate_cb, (void *)this);
   if (r < 0) {
@@ -1164,8 +1291,8 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
   }
   ldout(s->cct, 2) << "RGWGetObj::handle_slo_manifest()" << dendl;
 
-  list<RGWAccessControlPolicy> allocated_policies;
-  map<string, RGWAccessControlPolicy *> policies;
+  vector<RGWAccessControlPolicy> allocated_acls;
+  map<string, pair<RGWAccessControlPolicy *, optional<Policy>>> policies;
   map<string, rgw_bucket> buckets;
 
   map<uint64_t, rgw_slo_part> slo_parts;
@@ -1197,16 +1324,18 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
     string obj_name = path.substr(pos_sep + 1);
 
     rgw_bucket bucket;
-    RGWAccessControlPolicy *bucket_policy;
+    RGWAccessControlPolicy *bucket_acl;
+    Policy* bucket_policy;
 
     if (bucket_name.compare(s->bucket.name) != 0) {
       const auto& piter = policies.find(bucket_name);
       if (piter != policies.end()) {
-        bucket_policy = piter->second;
-        bucket = buckets[bucket_name];
+        bucket_acl = piter->second.first;
+        bucket_policy = piter->second.second.get_ptr();
+       bucket = buckets[bucket_name];
       } else {
-        allocated_policies.push_back(RGWAccessControlPolicy(s->cct));
-        RGWAccessControlPolicy& _bucket_policy = allocated_policies.back();
+       allocated_acls.push_back(RGWAccessControlPolicy(s->cct));
+       RGWAccessControlPolicy& _bucket_acl = allocated_acls.back();
 
         RGWBucketInfo bucket_info;
         map<string, bufferlist> bucket_attrs;
@@ -1220,23 +1349,28 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
           return r;
         }
         bucket = bucket_info.bucket;
-        bucket_policy = &_bucket_policy;
-        r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_policy,
+        bucket_acl = &_bucket_acl;
+        r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_acl,
                                bucket);
         if (r < 0) {
-          ldout(s->cct, 0) << "failed to read bucket policy for bucket "
+          ldout(s->cct, 0) << "failed to read bucket ACL for bucket "
                            << bucket << dendl;
           return r;
-        }
-        buckets[bucket_name] = bucket;
-        policies[bucket_name] = bucket_policy;
+       }
+       auto _bucket_policy = get_iam_policy_from_attr(
+         s->cct, store, bucket_attrs, bucket_info.bucket.tenant);
+        bucket_policy = _bucket_policy.get_ptr();
+       buckets[bucket_name] = bucket;
+        policies[bucket_name] = make_pair(bucket_acl, _bucket_policy);
       }
     } else {
       bucket = s->bucket;
-      bucket_policy = s->bucket_acl;
+      bucket_acl = s->bucket_acl;
+      bucket_policy = s->iam_policy.get_ptr();
     }
 
     rgw_slo_part part;
+    part.bucket_acl = bucket_acl;
     part.bucket_policy = bucket_policy;
     part.bucket = bucket;
     part.obj_name = obj_name;
@@ -1261,17 +1395,14 @@ int RGWGetObj::handle_slo_manifest(bufferlist& bl)
   s->obj_size = slo_info.total_size;
   ldout(s->cct, 20) << "s->obj_size=" << s->obj_size << dendl;
 
-  if (ofs < 0) {
-    ofs = total_len - std::min(-ofs, static_cast<off_t>(total_len));
-  }
-
-  if (end < 0 || end >= static_cast<off_t>(total_len)) {
-    end = total_len - 1;
+  int r = RGWRados::Object::Read::range_to_ofs(total_len, ofs, end);
+  if (r < 0) {
+    return r;
   }
 
   total_len = end - ofs + 1;
 
-  int r = iterate_slo_parts(s->cct, store, ofs, end, slo_parts,
+  r = iterate_slo_parts(s->cct, store, ofs, end, slo_parts,
         get_obj_user_manifest_iterate_cb, (void *)this);
   if (r < 0) {
     return r;
@@ -1385,11 +1516,11 @@ void RGWGetObj::execute()
   read_op.params.attrs = &attrs;
   read_op.params.lastmod = &lastmod;
   read_op.params.obj_size = &s->obj_size;
-  read_op.params.perr = &s->err;
 
   op_ret = read_op.prepare();
   if (op_ret < 0)
     goto done_err;
+  version_id = read_op.state.obj.key.instance;
 
   /* STAT ops don't need data, and do no i/o */
   if (get_type() == RGW_OP_STAT_OBJ) {
@@ -1428,17 +1559,6 @@ void RGWGetObj::execute()
       decompress.emplace(s->cct, &cs_info, partial_content, filter);
       filter = &*decompress;
   }
-  // for range requests with obj size 0
-  if (range_str && !(s->obj_size)) {
-    total_len = 0;
-    op_ret = -ERANGE;
-    goto done_err;
-  }
-
-  op_ret = read_op.range_to_ofs(s->obj_size, ofs, end);
-  if (op_ret < 0)
-    goto done_err;
-  total_len = (ofs <= end ? end + 1 - ofs : 0);
 
   attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
   if (attr_iter != attrs.end() && !skip_manifest) {
@@ -1463,6 +1583,18 @@ void RGWGetObj::execute()
     return;
   }
 
+  // for range requests with obj size 0
+  if (range_str && !(s->obj_size)) {
+    total_len = 0;
+    op_ret = -ERANGE;
+    goto done_err;
+  }
+
+  op_ret = read_op.range_to_ofs(s->obj_size, ofs, end);
+  if (op_ret < 0)
+    goto done_err;
+  total_len = (ofs <= end ? end + 1 - ofs : 0);
+
   /* Check whether the object has expired. Swift API documentation
    * stands that we should return 404 Not Found in such case. */
   if (need_object_expiration() && object_is_expired(attrs)) {
@@ -1842,6 +1974,14 @@ void RGWSetBucketWebsite::execute()
   if (op_ret < 0)
     return;
 
+  if (!store->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << __func__ << " forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
   s->bucket_info.has_website = true;
   s->bucket_info.website_conf = website_conf;
 
@@ -1879,7 +2019,8 @@ void RGWDeleteBucketWebsite::execute()
 
 int RGWStatBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_READ)) {
+  // This (a HEAD request on a bucket) is governed by the s3:ListBucket permission.
+  if (!verify_bucket_permission(s, rgw::IAM::s3ListBucket)) {
     return -EACCES;
   }
 
@@ -1918,7 +2059,15 @@ void RGWStatBucket::execute()
 
 int RGWListBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_READ)) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
+  if (!verify_bucket_permission(s,
+                               list_versions ?
+                               rgw::IAM::s3ListBucketVersions :
+                               rgw::IAM::s3ListBucket)) {
     return -EACCES;
   }
 
@@ -1956,10 +2105,6 @@ void RGWListBucket::execute()
     return;
   }
 
-  op_ret = get_params();
-  if (op_ret < 0)
-    return;
-
   if (need_container_stats()) {
     map<string, RGWBucketEnt> m;
     m[s->bucket.name] = RGWBucketEnt();
@@ -1983,7 +2128,7 @@ void RGWListBucket::execute()
   list_op.params.list_versions = list_versions;
 
   op_ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
-  if (op_ret >= 0 && !delimiter.empty()) {
+  if (op_ret >= 0) {
     next_marker = list_op.get_next_marker();
   }
 }
@@ -2261,10 +2406,22 @@ void RGWCreateBucket::execute()
   if (op_ret < 0)
     return;
 
-  if (!store->get_zonegroup().is_master &&
+  if (!location_constraint.empty() &&
+      !store->has_zonegroup_api(location_constraint)) {
+      ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
+                       << " can't be found." << dendl;
+      op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
+      s->err.message = "The specified location-constraint is not valid";
+      return;
+  }
+
+  if (!store->get_zonegroup().is_master_zonegroup() &&
       store->get_zonegroup().api_name != location_constraint) {
-    ldout(s->cct, 0) << "location constraint (" << location_constraint << ") doesn't match zonegroup" << " (" << store->get_zonegroup().api_name << ")" << dendl;
-    op_ret = -EINVAL;
+    ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
+                     << " doesn't match zonegroup" << " (" << store->get_zonegroup().api_name << ")"
+                     << dendl;
+    op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
+    s->err.message = "The specified location-constraint is not valid";
     return;
   }
 
@@ -2482,7 +2639,7 @@ void RGWCreateBucket::execute()
 
 int RGWDeleteBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (!verify_bucket_permission(s, rgw::IAM::s3DeleteBucket)) {
     return -EACCES;
   }
 
@@ -2579,7 +2736,8 @@ int RGWPutObj::verify_permission()
 {
   if (copy_source) {
 
-    RGWAccessControlPolicy cs_policy(s->cct);
+    RGWAccessControlPolicy cs_acl(s->cct);
+    optional<Policy> policy;
     map<string, bufferlist> cs_attrs;
     rgw_bucket cs_bucket(copy_source_bucket_info.bucket);
     rgw_obj_key cs_object(copy_source_object_name, copy_source_version_id);
@@ -2589,19 +2747,45 @@ int RGWPutObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, obj);
 
     /* check source object permissions */
-    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_policy, cs_bucket, cs_object) < 0) {
+    if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl, policy,
+                       cs_bucket, cs_object) < 0) {
       return -EACCES;
     }
 
     /* admin request overrides permission checks */
-    if (! s->auth.identity->is_admin_of(cs_policy.get_owner().get_id()) &&
-        ! cs_policy.verify_permission(*s->auth.identity, s->perm_mask, RGW_PERM_READ)) {
-      return -EACCES;
+    if (! s->auth.identity->is_admin_of(cs_acl.get_owner().get_id())) {
+      if (policy) {
+       auto e = policy->eval(s->env, *s->auth.identity,
+                             cs_object.instance.empty() ?
+                             rgw::IAM::s3GetObject :
+                             rgw::IAM::s3GetObjectVersion,
+                             rgw::IAM::ARN(obj));
+       if (e == Effect::Deny) {
+         return -EACCES; 
+       } else if (e == Effect::Pass &&
+                  !cs_acl.verify_permission(*s->auth.identity, s->perm_mask,
+                                               RGW_PERM_READ)) {
+         return -EACCES;
+       }
+      } else if (!cs_acl.verify_permission(*s->auth.identity, s->perm_mask,
+                                          RGW_PERM_READ)) {
+       return -EACCES;
+      }
     }
+  }
 
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
   }
 
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -2674,17 +2858,19 @@ int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
                                               map<string, bufferlist>& attrs,
                                               real_time delete_at,
                                               const char *if_match,
-                                              const char *if_nomatch, const string *user_data)
+                                              const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace)
 {
   complete_writing_data();
 
   RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj);
+  op_target.set_versioning_disabled(true);
   RGWRados::Object::Write head_obj_op(&op_target);
 
   head_obj_op.meta.set_mtime = set_mtime;
   head_obj_op.meta.mtime = mtime;
   head_obj_op.meta.owner = s->owner.get_id();
   head_obj_op.meta.delete_at = delete_at;
+  head_obj_op.meta.zones_trace = zones_trace;
 
   int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
   if (r < 0)
@@ -2959,6 +3145,11 @@ void RGWPutObj::execute()
       ldout(s->cct, 20) << "check_quota() returned ret=" << op_ret << dendl;
       goto done;
     }
+    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
+      goto done;
+    }
   }
 
   if (supplied_etag) {
@@ -2979,7 +3170,7 @@ void RGWPutObj::execute()
                                           s->bucket_info,
                                           obj);
     if (op_ret < 0) {
-      return;
+      goto done;
     }
   }
 
@@ -3014,17 +3205,17 @@ void RGWPutObj::execute()
   }
 
   do {
-    bufferlist data_in;
+    bufferlist data;
     if (fst > lst)
       break;
     if (!copy_source) {
-      len = get_data(data_in);
+      len = get_data(data);
     } else {
       uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
-      op_ret = get_data(fst, cur_lst, data_in);
+      op_ret = get_data(fst, cur_lst, data);
       if (op_ret < 0)
         goto done;
-      len = data_in.length();
+      len = data.length();
       s->content_length += len;
       fst += len;
     }
@@ -3033,19 +3224,12 @@ void RGWPutObj::execute()
       goto done;
     }
 
-    bufferlist &data = data_in;
-    if (len && s->aws4_auth_streaming_mode) {
-      /* use unwrapped data */
-      data = s->aws4_auth->bl;
-      len = data.length();
-    }
-
     if (need_calc_md5) {
       hash.Update((const byte *)data.c_str(), data.length());
     }
 
-    /* save data for producing torrent data */
-    torrent.save_data(data_in);
+    /* update torrrent */
+    torrent.update(data);
 
     /* do we need this operation to be synchronous? if we're dealing with an object with immutable
      * head, e.g., multipart object we need to make sure we're the first one writing to this object
@@ -3118,9 +3302,7 @@ void RGWPutObj::execute()
     }
   }
 
-  if (!chunked_upload &&
-      ofs != s->content_length &&
-      !s->aws4_auth_streaming_mode) {
+  if (!chunked_upload && ofs != s->content_length) {
     op_ret = -ERR_REQUEST_TIMEOUT;
     goto done;
   }
@@ -3128,30 +3310,11 @@ void RGWPutObj::execute()
 
   perfcounter->inc(l_rgw_put_b, s->obj_size);
 
-  if (s->aws4_auth_needs_complete) {
-
-    /* complete aws4 auth */
-
-    op_ret = RGW_Auth_S3::authorize_aws4_auth_complete(store, s);
-    if (op_ret) {
-      goto done;
-    }
-
-    s->aws4_auth_needs_complete = false;
-
-    /* verify signature */
-
-    if (s->aws4_auth->signature != s->aws4_auth->new_signature) {
-      op_ret = -ERR_SIGNATURE_NO_MATCH;
-      ldout(s->cct, 20) << "delayed aws4 auth failed" << dendl;
-      goto done;
-    }
-
-    /* authorization ok */
-
-    dout(10) << "v4 auth ok" << dendl;
-
+  op_ret = do_aws4_auth_completion();
+  if (op_ret < 0) {
+    goto done;
   }
+
   op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
                               user_quota, bucket_quota, s->obj_size);
   if (op_ret < 0) {
@@ -3159,6 +3322,12 @@ void RGWPutObj::execute()
     goto done;
   }
 
+  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
+    goto done;
+  }
+
   hash.Final(m);
 
   if (compressor && compressor->is_compressed()) {
@@ -3223,7 +3392,7 @@ void RGWPutObj::execute()
    * processing any input from user in order to prohibit overwriting. */
   if (slo_info) {
     bufferlist slo_userindicator_bl;
-    ::encode("True", slo_userindicator_bl);
+    slo_userindicator_bl.append("True", 4);
     emplace_attr(RGW_ATTR_SLO_UINDICATOR, std::move(slo_userindicator_bl));
   }
 
@@ -3236,7 +3405,7 @@ void RGWPutObj::execute()
   {
     torrent.init(s, store);
     torrent.set_create_date(mtime);
-    op_ret =  torrent.handle_data();
+    op_ret =  torrent.complete();
     if (0 != op_ret)
     {
       ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl;
@@ -3293,7 +3462,18 @@ void RGWPostObj::execute()
     return;
   }
 
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Deny) {
+      op_ret = -EACCES;
+      return;
+    } else if (e == Effect::Pass && !verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
+      op_ret = -EACCES;
+      return;
+    }
+  } else if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     op_ret = -EACCES;
     return;
   }
@@ -3317,6 +3497,11 @@ void RGWPostObj::execute()
       return;
     }
 
+    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+    if (op_ret < 0) {
+      return;
+    }
+
     RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                                         s->bucket_info,
                                         s->bucket,
@@ -3397,6 +3582,11 @@ void RGWPostObj::execute()
       return;
     }
 
+    op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+    if (op_ret < 0) {
+      return;
+    }
+
     hash.Final(m);
     buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
 
@@ -3559,7 +3749,7 @@ void RGWPutMetadataAccount::execute()
 
 int RGWPutMetadataBucket::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -3638,7 +3828,9 @@ void RGWPutMetadataBucket::execute()
 
 int RGWPutMetadataObject::verify_permission()
 {
-  if (!verify_object_permission(s, RGW_PERM_WRITE)) {
+  // This looks to be something specific to Swift. We could add
+  // operations like swift:PutMetadataObject to the Policy Engine.
+  if (!verify_object_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -3715,20 +3907,14 @@ int RGWDeleteObj::handle_slo_manifest(bufferlist& bl)
     const string& path_str = iter.path;
 
     const size_t sep_pos = path_str.find('/', 1 /* skip first slash */);
-    if (string::npos == sep_pos) {
+    if (boost::string_view::npos == sep_pos) {
       return -EINVAL;
     }
 
     RGWBulkDelete::acct_path_t path;
 
-    string bucket_name;
-    url_decode(path_str.substr(1, sep_pos - 1), bucket_name);
-
-    string obj_name;
-    url_decode(path_str.substr(sep_pos + 1), obj_name);
-
-    path.bucket_name = bucket_name;
-    path.obj_key = obj_name;
+    path.bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
+    path.obj_key = url_decode(path_str.substr(sep_pos + 1));
 
     items.push_back(path);
   }
@@ -3749,7 +3935,19 @@ int RGWDeleteObj::handle_slo_manifest(bufferlist& bl)
 
 int RGWDeleteObj::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
+  if (s->iam_policy) {
+    auto r = s->iam_policy->eval(s->env, *s->auth.identity,
+                                s->object.instance.empty() ?
+                                rgw::IAM::s3DeleteObject :
+                                rgw::IAM::s3DeleteObjectVersion,
+                                ARN(s->bucket, s->object.name));
+    if (r == Effect::Allow)
+      return true;
+    else if (r == Effect::Deny)
+      return false;
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
   }
 
@@ -3866,9 +4064,7 @@ bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name,
     params_str = url_src.substr(pos + 1);
   }
 
-  string dec_src;
-
-  url_decode(name_str, dec_src);
+  std::string dec_src = url_decode(name_str);
   const char *src = dec_src.c_str();
 
   if (*src == '/') ++src;
@@ -3899,7 +4095,8 @@ bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name,
 
 int RGWCopyObj::verify_permission()
 {
-  RGWAccessControlPolicy src_policy(s->cct);
+  RGWAccessControlPolicy src_acl(s->cct);
+  optional<Policy> src_policy;
   op_ret = get_params();
   if (op_ret < 0)
     return op_ret;
@@ -3934,17 +4131,32 @@ int RGWCopyObj::verify_permission()
     store->set_prefetch_data(s->obj_ctx, src_obj);
 
     /* check source object permissions */
-    op_ret = read_obj_policy(store, s, src_bucket_info, src_attrs, &src_policy,
-                         src_bucket, src_object);
+    op_ret = read_obj_policy(store, s, src_bucket_info, src_attrs, &src_acl,
+                            src_policy, src_bucket, src_object);
     if (op_ret < 0) {
       return op_ret;
     }
 
     /* admin request overrides permission checks */
-    if (! s->auth.identity->is_admin_of(src_policy.get_owner().get_id()) &&
-        ! src_policy.verify_permission(*s->auth.identity, s->perm_mask,
-                                       RGW_PERM_READ)) {
-      return -EACCES;
+    if (!s->auth.identity->is_admin_of(src_acl.get_owner().get_id())) {
+      if (src_policy) {
+       auto e = src_policy->eval(s->env, *s->auth.identity,
+                                 src_object.instance.empty() ?
+                                 rgw::IAM::s3GetObject :
+                                 rgw::IAM::s3GetObjectVersion,
+                                 ARN(src_obj));
+       if (e == Effect::Deny) {
+         return -EACCES;
+       } else if (e == Effect::Pass &&
+                  !src_acl.verify_permission(*s->auth.identity, s->perm_mask,
+                                             RGW_PERM_READ)) { 
+         return -EACCES;
+       }
+      } else if (!src_acl.verify_permission(*s->auth.identity,
+                                              s->perm_mask,
+                                           RGW_PERM_READ)) {
+       return -EACCES;
+      }
     }
   }
 
@@ -4097,7 +4309,6 @@ void RGWCopyObj::execute()
                           (version_id.empty() ? NULL : &version_id),
                           &s->req_id, /* use req_id as tag */
                           &etag,
-                          &s->err,
                           copy_obj_progress_cb, (void *)this
     );
 }
@@ -4106,9 +4317,12 @@ int RGWGetACLs::verify_permission()
 {
   bool perm;
   if (!s->object.empty()) {
-    perm = verify_object_permission(s, RGW_PERM_READ_ACP);
+    perm = verify_object_permission(s,
+                                   s->object.instance.empty() ?
+                                   rgw::IAM::s3GetObjectAcl :
+                                   rgw::IAM::s3GetObjectVersionAcl);
   } else {
-    perm = verify_bucket_permission(s, RGW_PERM_READ_ACP);
+    perm = verify_bucket_permission(s, rgw::IAM::s3GetObjectAcl);
   }
   if (!perm)
     return -EACCES;
@@ -4136,9 +4350,12 @@ int RGWPutACLs::verify_permission()
 {
   bool perm;
   if (!s->object.empty()) {
-    perm = verify_object_permission(s, RGW_PERM_WRITE_ACP);
+    perm = verify_object_permission(s,
+                                   s->object.instance.empty() ?
+                                   rgw::IAM::s3PutObjectAcl :
+                                   rgw::IAM::s3PutObjectVersionAcl);
   } else {
-    perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP);
+    perm = verify_bucket_permission(s, rgw::IAM::s3PutBucketAcl);
   }
   if (!perm)
     return -EACCES;
@@ -4149,7 +4366,7 @@ int RGWPutACLs::verify_permission()
 int RGWGetLC::verify_permission()
 {
   bool perm;
-  perm = verify_bucket_permission(s, RGW_PERM_READ_ACP);
+  perm = verify_bucket_permission(s, rgw::IAM::s3GetLifecycleConfiguration);
   if (!perm)
     return -EACCES;
 
@@ -4159,7 +4376,7 @@ int RGWGetLC::verify_permission()
 int RGWPutLC::verify_permission()
 {
   bool perm;
-  perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP);
+  perm = verify_bucket_permission(s, rgw::IAM::s3PutLifecycleConfiguration);
   if (!perm)
     return -EACCES;
 
@@ -4169,7 +4386,7 @@ int RGWPutLC::verify_permission()
 int RGWDeleteLC::verify_permission()
 {
   bool perm;
-  perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP);
+  perm = verify_bucket_permission(s, rgw::IAM::s3PutLifecycleConfiguration);
   if (!perm)
     return -EACCES;
 
@@ -4485,6 +4702,14 @@ void RGWPutCORS::execute()
   if (op_ret < 0)
     return;
 
+  if (!store->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << __func__ << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
   map<string, bufferlist> attrs = s->bucket_attrs;
   attrs[RGW_ATTR_CORS] = cors_bl;
   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
@@ -4635,8 +4860,20 @@ void RGWSetRequestPayment::execute()
 
 int RGWInitMultipart::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
+  }
 
   return 0;
 }
@@ -4740,8 +4977,20 @@ static int get_multipart_info(RGWRados *store, struct req_state *s,
 
 int RGWCompleteMultipart::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3PutObject,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
+  }
 
   return 0;
 }
@@ -4986,8 +5235,20 @@ void RGWCompleteMultipart::execute()
 
 int RGWAbortMultipart::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  if (s->iam_policy) {
+    auto e = s->iam_policy->eval(s->env, *s->auth.identity,
+                                rgw::IAM::s3AbortMultipartUpload,
+                                rgw_obj(s->bucket, s->object));
+    if (e == Effect::Allow) {
+      return 0;
+    } else if (e == Effect::Deny) {
+      return -EACCES;
+    }
+  }
+
+  if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
     return -EACCES;
+  }
 
   return 0;
 }
@@ -5023,7 +5284,7 @@ void RGWAbortMultipart::execute()
 
 int RGWListMultipart::verify_permission()
 {
-  if (!verify_object_permission(s, RGW_PERM_READ))
+  if (!verify_object_permission(s, rgw::IAM::s3ListMultipartUploadParts))
     return -EACCES;
 
   return 0;
@@ -5057,7 +5318,8 @@ void RGWListMultipart::execute()
 
 int RGWListBucketMultiparts::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_READ))
+  if (!verify_bucket_permission(s,
+                               rgw::IAM::s3ListBucketMultiPartUploads))
     return -EACCES;
 
   return 0;
@@ -5129,7 +5391,8 @@ void RGWGetHealthCheck::execute()
 
 int RGWDeleteMultiObj::verify_permission()
 {
-  if (!verify_bucket_permission(s, RGW_PERM_WRITE))
+  acl_allowed = verify_bucket_permission_no_policy(s, RGW_PERM_WRITE);
+  if (!acl_allowed && !s->iam_policy)
     return -EACCES;
 
   return 0;
@@ -5186,6 +5449,19 @@ void RGWDeleteMultiObj::execute()
         iter != multi_delete->objects.end() && num_processed < max_to_delete;
         ++iter, num_processed++) {
     rgw_obj obj(bucket, *iter);
+    if (s->iam_policy) {
+      auto e = s->iam_policy->eval(s->env,
+                                  *s->auth.identity,
+                                  iter->instance.empty() ?
+                                  rgw::IAM::s3DeleteObject :
+                                  rgw::IAM::s3DeleteObjectVersion,
+                                  obj);
+      if ((e == Effect::Deny) ||
+         (e == Effect::Pass && !acl_allowed)) {
+       send_partial_response(*iter, false, "", -EACCES);
+       continue;
+      }
+    }
 
     obj_ctx->obj.set_atomic(obj);
 
@@ -5232,11 +5508,14 @@ bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
     return false;
   }
 
+  auto policy = get_iam_policy_from_attr(s->cct, store, battrs, binfo.bucket.tenant);
+
   bucket_owner = bacl.get_owner();
 
   /* We can use global user_acl because each BulkDelete request is allowed
    * to work on entities from a single account only. */
-  return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE);
+  return verify_bucket_permission(s, binfo.bucket, s->user_acl.get(),
+                                 &bacl, policy, rgw::IAM::s3DeleteBucket);
 }
 
 bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
@@ -5291,7 +5570,7 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
       goto delop_fail;
     }
 
-    if (!store->get_zonegroup().is_master) {
+    if (!store->get_zonegroup().is_master_zonegroup()) {
       bufferlist in_data;
       ret = forward_request_to_master(s, &ot.read_version, store, in_data,
                                       nullptr);
@@ -5449,7 +5728,7 @@ RGWBulkUploadOp::parse_path(const boost::string_ref& path)
     }
   }
 
-  return boost::none;
+  return none;
 }
 
 std::pair<std::string, std::string>
@@ -5670,6 +5949,7 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
 
 
 bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
+                                                   const rgw_obj& obj,
                                                     std::map<std::string, ceph::bufferlist>& battrs,
                                                     ACLOwner& bucket_owner /* out */)
 {
@@ -5681,8 +5961,21 @@ bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
     return false;
   }
 
+  auto policy = get_iam_policy_from_attr(s->cct, store, battrs, binfo.bucket.tenant);
+
   bucket_owner = bacl.get_owner();
-  return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE);
+  if (policy) {
+    auto e = policy->eval(s->env, *s->auth.identity,
+                         rgw::IAM::s3PutObject, obj);
+    if (e == Effect::Allow) {
+      return true;
+    } else if (e == Effect::Deny) {
+      return false;
+    }
+  }
+    
+  return verify_bucket_permission_no_policy(s, s->user_acl.get(),
+                                           &bacl, RGW_PERM_WRITE);
 }
 
 int RGWBulkUploadOp::handle_file(const boost::string_ref path,
@@ -5718,7 +6011,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
-  if (! handle_file_verify_permission(binfo, battrs, bowner)) {
+  if (! handle_file_verify_permission(binfo,
+                                     rgw_obj(binfo.bucket, object),
+                                     battrs, bowner)) {
     ldout(s->cct, 20) << "bulk upload: object creation unauthorized" << dendl;
     op_ret = -EACCES;
     return op_ret;
@@ -5730,6 +6025,11 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
+  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
   RGWPutObjProcessor_Atomic processor(obj_ctx,
                                       binfo,
                                       binfo.bucket,
@@ -5803,6 +6103,11 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     return op_ret;
   }
 
+  op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
   unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
   hash.Final(m);
@@ -5968,11 +6273,13 @@ ssize_t RGWBulkUploadOp::AlignedStreamGetter::get_exactly(const size_t want,
 
 int RGWSetAttrs::verify_permission()
 {
+  // This looks to be part of the RGW-NFS machinery and has no S3 or
+  // Swift equivalent.
   bool perm;
   if (!s->object.empty()) {
-    perm = verify_object_permission(s, RGW_PERM_WRITE);
+    perm = verify_object_permission_no_policy(s, RGW_PERM_WRITE);
   } else {
-    perm = verify_bucket_permission(s, RGW_PERM_WRITE);
+    perm = verify_bucket_permission_no_policy(s, RGW_PERM_WRITE);
   }
   if (!perm)
     return -EACCES;
@@ -5993,9 +6300,8 @@ void RGWSetAttrs::execute()
 
   rgw_obj obj(s->bucket, s->object);
 
-  store->set_atomic(s->obj_ctx, obj);
-
   if (!s->object.empty()) {
+    store->set_atomic(s->obj_ctx, obj);
     op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, nullptr);
   } else {
     for (auto& iter : attrs) {
@@ -6031,6 +6337,77 @@ void RGWGetObjLayout::execute()
 }
 
 
+int RGWConfigBucketMetaSearch::verify_permission()
+{
+  if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWConfigBucketMetaSearch::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWConfigBucketMetaSearch::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "NOTICE: get_params() returned ret=" << op_ret << dendl;
+    return;
+  }
+
+  s->bucket_info.mdsearch_config = mdsearch_config;
+
+  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
+    return;
+  }
+}
+
+int RGWGetBucketMetaSearch::verify_permission()
+{
+  if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketMetaSearch::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+int RGWDelBucketMetaSearch::verify_permission()
+{
+  if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWDelBucketMetaSearch::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWDelBucketMetaSearch::execute()
+{
+  s->bucket_info.mdsearch_config.clear();
+
+  op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
+    return;
+  }
+}
+
+
 RGWHandler::~RGWHandler()
 {
 }
@@ -6048,6 +6425,7 @@ int RGWHandler::init(RGWRados *_store,
 int RGWHandler::do_init_permissions()
 {
   int ret = rgw_build_bucket_policies(store, s);
+  s->env = rgw_build_iam_environment(store, s);
 
   if (ret < 0) {
     ldout(s->cct, 10) << "read_permissions on " << s->bucket << " ret=" << ret << dendl;
@@ -6085,3 +6463,135 @@ int RGWHandler::error_handler(int err_no, string *error_content) {
   // This is the do-nothing error handler
   return err_no;
 }
+
+
+void RGWPutBucketPolicy::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+int RGWPutBucketPolicy::verify_permission()
+{
+  if (!verify_bucket_permission(s, rgw::IAM::s3PutBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+int RGWPutBucketPolicy::get_params()
+{
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  // At some point when I have more time I want to make a version of
+  // rgw_rest_read_all_input that doesn't use malloc.
+  op_ret = rgw_rest_read_all_input(s, &data, &len, max_size, false);
+  // And throws exceptions.
+  return op_ret;
+}
+
+void RGWPutBucketPolicy::execute()
+{
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  bufferlist in_data = bufferlist::static_from_mem(data, len);
+
+  if (!store->is_meta_master()) {
+    op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << "forward_request_to_master returned ret=" << op_ret << dendl;
+      return;
+    }
+  }
+
+  try {
+    Policy p(s->cct, s->bucket_tenant, in_data);
+    auto attrs = s->bucket_attrs;
+    attrs[RGW_ATTR_IAM_POLICY].clear();
+    attrs[RGW_ATTR_IAM_POLICY].append(p.text);
+    op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                                 &s->bucket_info.objv_tracker);
+    if (op_ret == -ECANCELED) {
+      op_ret = 0; /* lost a race, but it's ok because policies are immutable */
+    }
+  } catch (rgw::IAM::PolicyParseException& e) {
+    ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
+    op_ret = -EINVAL;
+  }
+}
+
+void RGWGetBucketPolicy::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/json");
+  dump_body(s, policy);
+}
+
+int RGWGetBucketPolicy::verify_permission()
+{
+  if (!verify_bucket_permission(s, rgw::IAM::s3GetBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWGetBucketPolicy::execute()
+{
+  auto attrs = s->bucket_attrs;
+  map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_IAM_POLICY);
+  if (aiter == attrs.end()) {
+    ldout(s->cct, 0) << __func__ << " can't find bucket IAM POLICY attr" 
+                     << " bucket_name = " << s->bucket_name << dendl;
+    op_ret = -ERR_NO_SUCH_BUCKET_POLICY;
+    s->err.message = "The bucket policy does not exist";
+    return;
+  } else {
+    policy = attrs[RGW_ATTR_IAM_POLICY];
+
+    if (policy.length() == 0) {
+      ldout(s->cct, 10) << "The bucket policy does not exist, bucket: " << s->bucket_name << dendl;
+      op_ret = -ERR_NO_SUCH_BUCKET_POLICY;
+      s->err.message = "The bucket policy does not exist";
+      return;
+    }
+  } 
+}
+
+void RGWDeleteBucketPolicy::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+int RGWDeleteBucketPolicy::verify_permission()
+{
+  if (!verify_bucket_permission(s, rgw::IAM::s3DeleteBucketPolicy)) {
+    return -EACCES;
+  }
+
+  return 0;
+}
+
+void RGWDeleteBucketPolicy::execute()
+{
+  auto attrs = s->bucket_attrs;
+  attrs.erase(RGW_ATTR_IAM_POLICY);
+  op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
+                               &s->bucket_info.objv_tracker);
+  if (op_ret == -ECANCELED) {
+    op_ret = 0; /* lost a race, but it's ok because policies are immutable */
+  }
+}