]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rest_s3.cc
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_rest_s3.cc
index b41441160f58f5a32134a2ada39f8755cabc5699..ec9011a06bc44d7640a563bc826e54709bb34237 100644 (file)
 #include "common/utf8.h"
 #include "common/ceph_json.h"
 #include "common/safe_io.h"
+#include "common/backport14.h"
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/replace.hpp>
+#include <boost/utility/string_view.hpp>
 
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
@@ -22,6 +24,7 @@
 #include "rgw_user.h"
 #include "rgw_cors.h"
 #include "rgw_cors_s3.h"
+#include "rgw_tag_s3.h"
 
 #include "rgw_client_io.h"
 
@@ -29,6 +32,8 @@
 #include "rgw_auth_keystone.h"
 #include "rgw_auth_registry.h"
 
+#include "rgw_es_query.h"
+
 #include <typeinfo> // for 'typeid'
 
 #include "rgw_ldap.h"
@@ -65,14 +70,13 @@ void dump_bucket(struct req_state *s, RGWBucketEnt& obj)
   s->formatter->close_section();
 }
 
-void rgw_get_errno_s3(rgw_http_errors *e , int err_no)
+void rgw_get_errno_s3(rgw_http_error *e , int err_no)
 {
-  const struct rgw_http_errors *r;
-  r = search_err(err_no, RGW_HTTP_ERRORS, ARRAY_LEN(RGW_HTTP_ERRORS));
+  rgw_http_errors::const_iterator r = rgw_http_s3_errors.find(err_no);
 
-  if (r) {
-    e->http_ret = r->http_ret;
-    e->s3_code = r->s3_code;
+  if (r != rgw_http_s3_errors.end()) {
+    e->http_ret = r->second.first;
+    e->s3_code = r->second.second;
   } else {
     e->http_ret = 500;
     e->s3_code = "UnknownError";
@@ -125,6 +129,12 @@ int RGWGetObj_ObjStore_S3::get_params()
   // all of the data from its parts. the parts will sync as separate objects
   skip_manifest = s->info.args.exists(RGW_SYS_PARAM_PREFIX "sync-manifest");
 
+  // multisite sync requests should fetch encrypted data, along with the
+  // attributes needed to support decryption on the other zone
+  if (s->system_request) {
+    skip_decrypt = s->info.args.exists(RGW_SYS_PARAM_PREFIX "skip-decrypt");
+  }
+
   return RGWGetObj_ObjStore::get_params();
 }
 
@@ -173,7 +183,7 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
     dump_errno(s, custom_http_ret);
   } else {
     set_req_state_err(s, (partial_content && !op_ret) ? STATUS_PARTIAL_CONTENT
-                 : op_ret);
+                  : op_ret);
     dump_errno(s);
   }
 
@@ -235,6 +245,10 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
 
   dump_content_length(s, total_len);
   dump_last_modified(s, lastmod);
+  if (!version_id.empty()) {
+    dump_header(s, "x-amz-version-id", version_id);
+  }
+  
 
   if (! op_ret) {
     if (! lo_etag.empty()) {
@@ -276,11 +290,23 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
         if (!content_type) {
           content_type = iter->second.c_str();
         }
+      } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) {
+        // this attr has an extra length prefix from ::encode() in prior versions
+        dump_header(s, "X-Object-Meta-Static-Large-Object", "True");
       } else if (strncmp(name, RGW_ATTR_META_PREFIX,
                         sizeof(RGW_ATTR_META_PREFIX)-1) == 0) {
         /* User custom metadata. */
         name += sizeof(RGW_ATTR_PREFIX) - 1;
         dump_header(s, name, iter->second);
+      } else if (iter->first.compare(RGW_ATTR_TAGS) == 0) {
+        RGWObjTags obj_tags;
+        try{
+          bufferlist::iterator it = iter->second.begin();
+          obj_tags.decode(it);
+        } catch (buffer::error &err) {
+          ldout(s->cct,0) << "Error caught buffer::error couldn't decode TagSet " << dendl;
+        }
+        dump_header(s, RGW_AMZ_TAG_COUNT, obj_tags.count());
       }
     }
   }
@@ -317,6 +343,10 @@ send_data:
 
 int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetDataCB> *filter, RGWGetDataCB* cb, bufferlist* manifest_bl)
 {
+  if (skip_decrypt) { // bypass decryption for multisite sync requests
+    return 0;
+  }
+
   int res = 0;
   std::unique_ptr<BlockCrypt> block_crypt;
   res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses);
@@ -337,6 +367,97 @@ int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetDataCB> *fil
   return res;
 }
 
+void RGWGetObjTags_ObjStore_S3::send_response_data(bufferlist& bl)
+{
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  s->formatter->open_object_section_in_ns("Tagging", XMLNS_AWS_S3);
+  s->formatter->open_object_section("TagSet");
+  if (has_tags){
+    RGWObjTagSet_S3 tagset;
+    bufferlist::iterator iter = bl.begin();
+    try {
+      tagset.decode(iter);
+    } catch (buffer::error& err) {
+      ldout(s->cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+      op_ret= -EIO;
+      return;
+    }
+    tagset.dump_xml(s->formatter);
+  }
+  s->formatter->close_section();
+  s->formatter->close_section();
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+
+int RGWPutObjTags_ObjStore_S3::get_params()
+{
+  RGWObjTagsXMLParser parser;
+
+  if (!parser.init()){
+    return -EINVAL;
+  }
+
+  char *data=nullptr;
+  int len=0;
+
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  int r = rgw_rest_read_all_input(s, &data, &len, max_size, false);
+
+  if (r < 0)
+    return r;
+
+  auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
+
+  if (!parser.parse(data, len, 1)) {
+    return -ERR_MALFORMED_XML;
+  }
+
+  RGWObjTagSet_S3 *obj_tags_s3;
+  RGWObjTagging_S3 *tagging;
+
+  tagging = static_cast<RGWObjTagging_S3 *>(parser.find_first("Tagging"));
+  obj_tags_s3 = static_cast<RGWObjTagSet_S3 *>(tagging->find_first("TagSet"));
+  if(!obj_tags_s3){
+    return -ERR_MALFORMED_XML;
+  }
+
+  RGWObjTags obj_tags;
+  r = obj_tags_s3->rebuild(obj_tags);
+  if (r < 0)
+    return r;
+
+  obj_tags.encode(tags_bl);
+  ldout(s->cct, 20) << "Read " << obj_tags.count() << "tags" << dendl;
+
+  return 0;
+}
+
+void RGWPutObjTags_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+}
+
+void RGWDeleteObjTags_ObjStore_S3::send_response()
+{
+  int r = op_ret;
+  if (r == -ENOENT)
+    r = 0;
+  if (!r)
+    r = STATUS_NO_CONTENT;
+
+  set_req_state_err(s, r);
+  dump_errno(s);
+  end_header(s, this);
+}
 
 void RGWListBuckets_ObjStore_S3::send_response_begin(bool has_buckets)
 {
@@ -404,6 +525,15 @@ static void dump_usage_categories_info(Formatter *formatter, const rgw_usage_log
   formatter->close_section(); // Category
 }
 
+static void dump_usage_bucket_info(Formatter *formatter, const std::string& name, const cls_user_bucket_entry& entry)
+{
+  formatter->open_object_section("Entry");
+  formatter->dump_string("Bucket", name);
+  formatter->dump_int("Bytes", entry.size);
+  formatter->dump_int("Bytes_Rounded", entry.size_rounded);
+  formatter->close_section(); // entry
+}
+
 void RGWGetUsage_ObjStore_S3::send_response()
 {
   if (op_ret < 0)
@@ -459,7 +589,7 @@ void RGWGetUsage_ObjStore_S3::send_response()
      }
      formatter->close_section(); // entries
    }
-  
+
    if (show_log_sum) {
      formatter->open_array_section("Summary");
      map<string, rgw_usage_log_entry>::iterator siter;
@@ -475,26 +605,38 @@ void RGWGetUsage_ObjStore_S3::send_response()
        formatter->dump_int("BytesReceived", total_usage.bytes_received);
        formatter->dump_int("Ops", total_usage.ops);
        formatter->dump_int("SuccessfulOps", total_usage.successful_ops);
-       formatter->close_section(); // total 
+       formatter->close_section(); // total
        formatter->close_section(); // user
      }
 
      if (s->cct->_conf->rgw_rest_getusage_op_compat) {
-       formatter->open_object_section("Stats"); 
-     } 
-       
+       formatter->open_object_section("Stats");
+     }
+
      formatter->dump_int("TotalBytes", header.stats.total_bytes);
      formatter->dump_int("TotalBytesRounded", header.stats.total_bytes_rounded);
      formatter->dump_int("TotalEntries", header.stats.total_entries);
-     
-     if (s->cct->_conf->rgw_rest_getusage_op_compat) { 
+
+     if (s->cct->_conf->rgw_rest_getusage_op_compat) {
        formatter->close_section(); //Stats
      }
 
      formatter->close_section(); // summary
    }
-   formatter->close_section(); // usage
-   rgw_flush_formatter_and_reset(s, s->formatter);
+
+  formatter->open_array_section("CapacityUsed");
+  formatter->open_object_section("User");
+  formatter->open_array_section("Buckets");
+  for (const auto& biter : buckets_usage) {
+    const cls_user_bucket_entry& entry = biter.second;
+    dump_usage_bucket_info(formatter, biter.first, entry);
+  }
+  formatter->close_section(); // Buckets
+  formatter->close_section(); // User
+  formatter->close_section(); // CapacityUsed
+
+  formatter->close_section(); // usage
+  rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
 int RGWListBucket_ObjStore_S3::get_params()
@@ -539,8 +681,11 @@ void RGWListBucket_ObjStore_S3::send_versioned_response()
   s->formatter->dump_string("Name", s->bucket_name);
   s->formatter->dump_string("Prefix", prefix);
   s->formatter->dump_string("KeyMarker", marker.name);
-  if (is_truncated && !next_marker.empty())
+  s->formatter->dump_string("VersionIdMarker", marker.instance);
+  if (is_truncated && !next_marker.empty()) {
     s->formatter->dump_string("NextKeyMarker", next_marker.name);
+    s->formatter->dump_string("NextVersionIdMarker", next_marker.instance);
+  }
   s->formatter->dump_int("MaxKeys", max);
   if (!delimiter.empty())
     s->formatter->dump_string("Delimiter", delimiter);
@@ -783,11 +928,9 @@ int RGWSetBucketVersioning_ObjStore_S3::get_params()
   
   auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
 
-  if (s->aws4_auth_needs_complete) {
-    int ret_auth = do_aws4_auth_completion();
-    if (ret_auth < 0) {
-      return ret_auth;
-    }
+  r = do_aws4_auth_completion();
+  if (r < 0) {
+    return r;
   }
 
   RGWSetBucketVersioningParser parser;
@@ -835,13 +978,14 @@ int RGWSetBucketWebsite_ObjStore_S3::get_params()
 
   auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
 
-  if (s->aws4_auth_needs_complete) {
-      int ret_auth = do_aws4_auth_completion();
-      if (ret_auth < 0) {
-        return ret_auth;
-      }
+  r = do_aws4_auth_completion();
+  if (r < 0) {
+    return r;
   }
 
+  bufferptr in_ptr(data, len);
+  in_data.append(in_ptr);
+
   RGWXMLDecoder::XMLParser parser;
   if (!parser.init()) {
     ldout(s->cct, 0) << "ERROR: failed to initialize parser" << dendl;
@@ -1006,11 +1150,9 @@ int RGWCreateBucket_ObjStore_S3::get_params()
 
   auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
 
-  if (s->aws4_auth_needs_complete) {
-    int ret_auth = do_aws4_auth_completion();
-    if (ret_auth < 0) {
-      return ret_auth;
-    }
+  const int auth_ret = do_aws4_auth_completion();
+  if (auth_ret < 0) {
+    return auth_ret;
   }
   
   bufferptr in_ptr(data, len);
@@ -1096,15 +1238,15 @@ void RGWDeleteBucket_ObjStore_S3::send_response()
 
 int RGWPutObj_ObjStore_S3::get_params()
 {
+  if (!s->length)
+    return -ERR_LENGTH_REQUIRED;
+
   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
   map<string, bufferlist> src_attrs;
   size_t pos;
   int ret;
 
   RGWAccessControlPolicy_S3 s3policy(s->cct);
-  if (!s->length)
-    return -ERR_LENGTH_REQUIRED;
-
   ret = create_s3_policy(s, store, s3policy, s->owner);
   if (ret < 0)
     return ret;
@@ -1119,6 +1261,7 @@ int RGWPutObj_ObjStore_S3::get_params()
   /* handle x-amz-copy-source */
 
   if (copy_source) {
+    if (*copy_source == '/') ++copy_source;
     copy_source_bucket_name = copy_source;
     pos = copy_source_bucket_name.find("/");
     if (pos == std::string::npos) {
@@ -1131,10 +1274,10 @@ int RGWPutObj_ObjStore_S3::get_params()
 #define VERSION_ID_STR "?versionId="
     pos = copy_source_object_name.find(VERSION_ID_STR);
     if (pos == std::string::npos) {
-      url_decode(copy_source_object_name, copy_source_object_name);
+      copy_source_object_name = url_decode(copy_source_object_name);
     } else {
       copy_source_version_id = copy_source_object_name.substr(pos + sizeof(VERSION_ID_STR) - 1);
-      url_decode(copy_source_object_name.substr(0, pos), copy_source_object_name);
+      copy_source_object_name = url_decode(copy_source_object_name.substr(0, pos));
     }
     pos = copy_source_bucket_name.find(":");
     if (pos == std::string::npos) {
@@ -1183,161 +1326,29 @@ int RGWPutObj_ObjStore_S3::get_params()
 
   } /* copy_source */
 
-  return RGWPutObj_ObjStore::get_params();
-}
-
-int RGWPutObj_ObjStore_S3::validate_aws4_single_chunk(char *chunk_str,
-                                                      char *chunk_data_str,
-                                                      unsigned int chunk_data_size,
-                                                      string chunk_signature)
-{
-
-  /* string to sign */
-
-  string hash_empty_str;
-  rgw_hash_s3_string_sha256("", 0, hash_empty_str);
-
-  string hash_chunk_data;
-  rgw_hash_s3_string_sha256(chunk_data_str, chunk_data_size, hash_chunk_data);
-
-  string string_to_sign = "AWS4-HMAC-SHA256-PAYLOAD\n";
-  string_to_sign.append(s->aws4_auth->date + "\n");
-  string_to_sign.append(s->aws4_auth->credential_scope + "\n");
-  string_to_sign.append(s->aws4_auth->seed_signature + "\n");
-  string_to_sign.append(hash_empty_str + "\n");
-  string_to_sign.append(hash_chunk_data);
-
-  /* new chunk signature */
-
-  char signature_k[CEPH_CRYPTO_HMACSHA256_DIGESTSIZE];
-  calc_hmac_sha256(s->aws4_auth->signing_k, CEPH_CRYPTO_HMACSHA256_DIGESTSIZE,
-      string_to_sign.c_str(), string_to_sign.size(), signature_k);
-
-  char aux[CEPH_CRYPTO_HMACSHA256_DIGESTSIZE * 2 + 1];
-  buf_to_hex((unsigned char *) signature_k, CEPH_CRYPTO_HMACSHA256_DIGESTSIZE, aux);
-
-  string new_chunk_signature = string(aux);
-
-  ldout(s->cct, 20) << "--------------- aws4 chunk validation" << dendl;
-  ldout(s->cct, 20) << "chunk_signature     = " << chunk_signature << dendl;
-  ldout(s->cct, 20) << "new_chunk_signature = " << new_chunk_signature << dendl;
-  ldout(s->cct, 20) << "aws4 chunk signing_key    = " << s->aws4_auth->signing_key << dendl;
-  ldout(s->cct, 20) << "aws4 chunk string_to_sign = " << rgw::crypt_sanitize::log_content{string_to_sign.c_str()} << dendl;
-
-  /* chunk auth ok? */
-
-  if (new_chunk_signature != chunk_signature) {
-    ldout(s->cct, 20) << "ERROR: AWS4 chunk signature does NOT match (new_chunk_signature != chunk_signature)" << dendl;
-    return -ERR_SIGNATURE_NO_MATCH;
-  }
-
-  /* update seed signature */
-
-  s->aws4_auth->seed_signature = new_chunk_signature;
-
-  return 0;
-}
-
-int RGWPutObj_ObjStore_S3::validate_and_unwrap_available_aws4_chunked_data(bufferlist& bl_in,
-                                                                           bufferlist& bl_out)
-{
-
-  /* string(IntHexBase(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n */
-
-  const unsigned int chunk_str_min_len = 1 + 17 + 64 + 2; /* len('0') = 1 */
-
-  char *chunk_str = bl_in.c_str();
-  unsigned int budget = bl_in.length();
-
-  bl_out.clear();
-
-  while (true) {
-
-    /* check available metadata */
-
-    if (budget < chunk_str_min_len) {
-      return -ERR_SIGNATURE_NO_MATCH;
-    }
-
-    unsigned int chunk_offset = 0;
-
-    /* grab chunk size */
-
-    while ((chunk_offset < chunk_str_min_len) && (chunk_str[chunk_offset] != ';'))
-      chunk_offset++;
-    string str = string(chunk_str, chunk_offset);
-    unsigned int chunk_data_size;
-    stringstream ss;
-    ss << std::hex << str;
-    ss >> chunk_data_size;
-    if (ss.fail()) {
-      return -ERR_SIGNATURE_NO_MATCH;
-    }
-
-    /* grab chunk signature */
-
-    chunk_offset += 17;
-    string chunk_signature = string(chunk_str, chunk_offset, 64);
-
-    /* get chunk data */
-
-    chunk_offset += 64 + 2;
-    char *chunk_data_str = chunk_str + chunk_offset;
-
-    /* handle budget */
-
-    budget -= chunk_offset;
-    if (budget < chunk_data_size) {
-      return -ERR_SIGNATURE_NO_MATCH;
-    } else {
-      budget -= chunk_data_size;
-    }
-
-    /* auth single chunk */
+  /* handle object tagging */
+  auto tag_str = s->info.env->get("HTTP_X_AMZ_TAGGING");
+  if (tag_str){
+    obj_tags = ceph::make_unique<RGWObjTags>();
+    ret = obj_tags->set_from_string(tag_str);
+    if (ret < 0){
+      ldout(s->cct,0) << "setting obj tags failed with " << ret << dendl;
+      if (ret == -ERR_INVALID_TAG){
+        ret = -EINVAL; //s3 returns only -EINVAL for PUT requests
+      }
 
-    if (validate_aws4_single_chunk(chunk_str, chunk_data_str, chunk_data_size, chunk_signature) < 0) {
-      ldout(s->cct, 20) << "ERROR AWS4 single chunk validation" << dendl;
-      return -ERR_SIGNATURE_NO_MATCH;
+      return ret;
     }
-
-    /* aggregate single chunk */
-
-    bl_out.append(chunk_data_str, chunk_data_size);
-
-    /* last chunk or no more budget? */
-
-    if ((chunk_data_size == 0) || (budget == 0))
-      break;
-
-    /* next chunk */
-
-    chunk_offset += chunk_data_size;
-    chunk_str += chunk_offset;
   }
 
-  /* authorization ok */
-
-  return 0;
-
+  return RGWPutObj_ObjStore::get_params();
 }
 
 int RGWPutObj_ObjStore_S3::get_data(bufferlist& bl)
 {
-  int ret = RGWPutObj_ObjStore::get_data(bl);
-  if (ret < 0)
-    s->aws4_auth_needs_complete = false;
-
-  int ret_auth;
-
-  if (s->aws4_auth_streaming_mode && ret > 0) {
-    ret_auth = validate_and_unwrap_available_aws4_chunked_data(bl, s->aws4_auth->bl);
-    if (ret_auth < 0) {
-      return ret_auth;
-    }
-  }
-
-  if ((ret == 0) && s->aws4_auth_needs_complete) {
-    ret_auth = do_aws4_auth_completion();
+  const int ret = RGWPutObj_ObjStore::get_data(bl);
+  if (ret == 0) {
+    const int ret_auth = do_aws4_auth_completion();
     if (ret_auth < 0) {
       return ret_auth;
     }
@@ -1406,7 +1417,6 @@ static inline int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& o
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
-  read_op.params.perr = &s->err;
 
   return read_op.prepare();
 }
@@ -1571,7 +1581,7 @@ int RGWPostObj_ObjStore_S3::get_params()
     bool boundary;
     uint64_t chunk_size = s->cct->_conf->rgw_max_chunk_size;
     r = read_data(part.data, chunk_size, boundary, done);
-    if (!boundary) {
+    if (r < 0 || !boundary) {
       err_msg = "Couldn't find boundary";
       return -EINVAL;
     }
@@ -1598,7 +1608,11 @@ int RGWPostObj_ObjStore_S3::get_params()
   env.add_var("key", s->object.name);
 
   part_str(parts, "Content-Type", &content_type);
-  env.add_var("Content-Type", content_type);
+
+  /* AWS permits POST without Content-Type: http://tracker.ceph.com/issues/20201 */
+  if (! content_type.empty()) {
+    env.add_var("Content-Type", content_type);
+  }
 
   map<string, struct post_form_part, ltstr_nocase>::iterator piter =
     parts.upper_bound(RGW_AMZ_META_PREFIX);
@@ -1640,58 +1654,131 @@ int RGWPostObj_ObjStore_S3::get_params()
   if (r < 0)
     return r;
 
+  r = get_tags();
+  if (r < 0)
+    return r;
+
+
   min_len = post_policy.min_length;
   max_len = post_policy.max_length;
 
+
+
   return 0;
 }
 
-int RGWPostObj_ObjStore_S3::get_policy()
+int RGWPostObj_ObjStore_S3::get_tags()
 {
-  if (part_bl(parts, "policy", &s->auth.s3_postobj_creds.encoded_policy)) {
-    // check that the signature matches the encoded policy
-    if (!part_str(parts, "AWSAccessKeyId",
-                  &s->auth.s3_postobj_creds.access_key)) {
-      ldout(s->cct, 0) << "No S3 access key found!" << dendl;
-      err_msg = "Missing access key";
+  string tags_str;
+  if (part_str(parts, "tagging", &tags_str)) {
+    RGWObjTagsXMLParser parser;
+    if (!parser.init()){
+      ldout(s->cct, 0) << "Couldn't init RGWObjTags XML parser" << dendl;
+      err_msg = "Server couldn't process the request";
+      return -EINVAL; // TODO: This class of errors in rgw code should be a 5XX error
+    }
+    if (!parser.parse(tags_str.c_str(), tags_str.size(), 1)) {
+      ldout(s->cct,0 ) << "Invalid Tagging XML" << dendl;
+      err_msg = "Invalid Tagging XML";
       return -EINVAL;
     }
 
-    if (!part_str(parts, "signature", &s->auth.s3_postobj_creds.signature)) {
-      ldout(s->cct, 0) << "No signature found!" << dendl;
-      err_msg = "Missing signature";
-      return -EINVAL;
+    RGWObjTagSet_S3 *obj_tags_s3;
+    RGWObjTagging_S3 *tagging;
+
+    tagging = static_cast<RGWObjTagging_S3 *>(parser.find_first("Tagging"));
+    obj_tags_s3 = static_cast<RGWObjTagSet_S3 *>(tagging->find_first("TagSet"));
+    if(!obj_tags_s3){
+      return -ERR_MALFORMED_XML;
     }
 
-    /* FIXME: this is a makeshift solution. The browser upload authentication will be
-     * handled by an instance of rgw::auth::Completer spawned in Handler's authorize()
-     * method. */
-    const auto& strategy = auth_registry_ptr->get_s3_post();
-    try {
-      auto result = strategy.authenticate(s);
-      if (result.get_status() != decltype(result)::Status::GRANTED) {
-        return -EACCES;
+    RGWObjTags obj_tags;
+    int r = obj_tags_s3->rebuild(obj_tags);
+    if (r < 0)
+      return r;
+
+    bufferlist tags_bl;
+    obj_tags.encode(tags_bl);
+    ldout(s->cct, 20) << "Read " << obj_tags.count() << "tags" << dendl;
+    attrs[RGW_ATTR_TAGS] = tags_bl;
+  }
+
+
+  return 0;
+}
+
+int RGWPostObj_ObjStore_S3::get_policy()
+{
+  if (part_bl(parts, "policy", &s->auth.s3_postobj_creds.encoded_policy)) {
+    bool aws4_auth = false;
+
+    /* x-amz-algorithm handling */
+    using rgw::auth::s3::AWS4_HMAC_SHA256_STR;
+    if ((part_str(parts, "x-amz-algorithm", &s->auth.s3_postobj_creds.x_amz_algorithm)) &&
+        (s->auth.s3_postobj_creds.x_amz_algorithm == AWS4_HMAC_SHA256_STR)) {
+      ldout(s->cct, 0) << "Signature verification algorithm AWS v4 (AWS4-HMAC-SHA256)" << dendl;
+      aws4_auth = true;
+    } else {
+      ldout(s->cct, 0) << "Signature verification algorithm AWS v2" << dendl;
+    }
+
+    // check that the signature matches the encoded policy
+    if (aws4_auth) {
+      /* AWS4 */
+
+      /* x-amz-credential handling */
+      if (!part_str(parts, "x-amz-credential",
+                    &s->auth.s3_postobj_creds.x_amz_credential)) {
+        ldout(s->cct, 0) << "No S3 aws4 credential found!" << dendl;
+        err_msg = "Missing aws4 credential";
+        return -EINVAL;
+      }
+
+      /* x-amz-signature handling */
+      if (!part_str(parts, "x-amz-signature",
+                    &s->auth.s3_postobj_creds.signature)) {
+        ldout(s->cct, 0) << "No aws4 signature found!" << dendl;
+        err_msg = "Missing aws4 signature";
+        return -EINVAL;
       }
 
-      try {
-        auto applier = result.get_applier();
+      /* x-amz-date handling */
+      std::string received_date_str;
+      if (!part_str(parts, "x-amz-date", &received_date_str)) {
+        ldout(s->cct, 0) << "No aws4 date found!" << dendl;
+        err_msg = "Missing aws4 date";
+        return -EINVAL;
+      }
+    } else {
+      /* AWS2 */
 
-        applier->load_acct_info(*s->user);
-        s->perm_mask = applier->get_perm_mask();
-        applier->modify_request_state(s);
-        s->auth.identity = std::move(applier);
+      // check that the signature matches the encoded policy
+      if (!part_str(parts, "AWSAccessKeyId",
+                    &s->auth.s3_postobj_creds.access_key)) {
+        ldout(s->cct, 0) << "No S3 aws2 access key found!" << dendl;
+        err_msg = "Missing aws2 access key";
+        return -EINVAL;
+      }
 
-        s->owner.set_id(s->user->user_id);
-        s->owner.set_name(s->user->display_name);
-        /* OK, fall through. */
-      } catch (int err) {
-        return -EACCES;
+      if (!part_str(parts, "signature", &s->auth.s3_postobj_creds.signature)) {
+        ldout(s->cct, 0) << "No aws2 signature found!" << dendl;
+        err_msg = "Missing aws2 signature";
+        return -EINVAL;
       }
-    } catch (int err) {
-      return -EACCES;
     }
 
-    ldout(s->cct, 0) << "Successful Signature Verification!" << dendl;
+    /* FIXME: this is a makeshift solution. The browser upload authentication will be
+     * handled by an instance of rgw::auth::Completer spawned in Handler's authorize()
+     * method. */
+    const int ret = rgw::auth::Strategy::apply(auth_registry_ptr->get_s3_post(), s);
+    if (ret != 0) {
+      return -EACCES;
+    } else {
+      /* Populate the owner info. */
+      s->owner.set_id(s->user->user_id);
+      s->owner.set_name(s->user->display_name);
+      ldout(s->cct, 20) << "Successful Signature Verification!" << dendl;
+    }
 
     ceph::bufferlist decoded_policy;
     try {
@@ -1703,7 +1790,7 @@ int RGWPostObj_ObjStore_S3::get_policy()
     }
 
     decoded_policy.append('\0'); // NULL terminate
-    ldout(s->cct, 0) << "POST policy: " << decoded_policy.c_str() << dendl;
+    ldout(s->cct, 20) << "POST policy: " << decoded_policy.c_str() << dendl;
 
 
     int r = post_policy.from_json(decoded_policy, err_msg);
@@ -1715,9 +1802,15 @@ int RGWPostObj_ObjStore_S3::get_policy()
       return -EINVAL;
     }
 
-    post_policy.set_var_checked("AWSAccessKeyId");
+    if (aws4_auth) {
+      /* AWS4 */
+      post_policy.set_var_checked("x-amz-signature");
+    } else {
+      /* AWS2 */
+      post_policy.set_var_checked("AWSAccessKeyId");
+      post_policy.set_var_checked("signature");
+    }
     post_policy.set_var_checked("policy");
-    post_policy.set_var_checked("signature");
 
     r = post_policy.check(&env, err_msg);
     if (r < 0) {
@@ -1761,7 +1854,7 @@ int RGWPostObj_ObjStore_S3::complete_get_params()
     bool boundary;
     uint64_t chunk_size = s->cct->_conf->rgw_max_chunk_size;
     r = read_data(part.data, chunk_size, boundary, done);
-    if (!boundary) {
+    if (r < 0 || !boundary) {
       return -EINVAL;
     }
 
@@ -1925,9 +2018,7 @@ int RGWDeleteObj_ObjStore_S3::get_params()
   }
 
   if (if_unmod) {
-    string if_unmod_str(if_unmod);
-    string if_unmod_decoded;
-    url_decode(if_unmod_str, if_unmod_decoded);
+    std::string if_unmod_decoded = url_decode(if_unmod);
     uint64_t epoch;
     uint64_t nsec;
     if (utime_t::parse_date(if_unmod_decoded, &epoch, &nsec) < 0) {
@@ -2084,10 +2175,8 @@ void RGWGetACLs_ObjStore_S3::send_response()
 int RGWPutACLs_ObjStore_S3::get_params()
 {
   int ret =  RGWPutACLs_ObjStore::get_params();
-  if (ret < 0)
-    s->aws4_auth_needs_complete = false;
-  if (s->aws4_auth_needs_complete) {
-    int ret_auth = do_aws4_auth_completion();
+  if (ret >= 0) {
+    const int ret_auth = do_aws4_auth_completion();
     if (ret_auth < 0) {
       return ret_auth;
     }
@@ -2225,11 +2314,9 @@ int RGWPutCORS_ObjStore_S3::get_params()
 
   auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
 
-  if (s->aws4_auth_needs_complete) {
-    r = do_aws4_auth_completion();
-    if (r < 0) {
-      return r;
-    }
+  r = do_aws4_auth_completion();
+  if (r < 0) {
+    return r;
   }
 
   if (!parser.init()) {
@@ -2246,6 +2333,12 @@ int RGWPutCORS_ObjStore_S3::get_params()
     return -EINVAL;
   }
 
+  // forward bucket cors requests to meta master zone
+  if (!store->is_meta_master()) {
+    /* only need to keep this data around if we're not meta master */
+    in_data.append(data, len);
+  }
+
   if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
     ldout(s->cct, 15) << "CORSConfiguration";
     cors_config->to_xml(*_dout);
@@ -2434,13 +2527,7 @@ int RGWCompleteMultipart_ObjStore_S3::get_params()
     return ret;
   }
 
-  if (s->aws4_auth_needs_complete) {
-    int ret_auth = do_aws4_auth_completion();
-    if (ret_auth < 0) {
-      return ret_auth;
-    }
-  }
-  return 0;
+  return do_aws4_auth_completion();
 }
 
 void RGWCompleteMultipart_ObjStore_S3::send_response()
@@ -2604,13 +2691,7 @@ int RGWDeleteMultiObj_ObjStore_S3::get_params()
     return ret;
   }
 
-  if (s->aws4_auth_needs_complete) {
-    int ret_auth = do_aws4_auth_completion();
-    if (ret_auth < 0) {
-      return ret_auth;
-    }
-  }
-  return 0;
+  return do_aws4_auth_completion();
 }
 
 void RGWDeleteMultiObj_ObjStore_S3::send_status()
@@ -2654,7 +2735,7 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
       }
       s->formatter->close_section();
     } else if (op_ret < 0) {
-      struct rgw_http_errors r;
+      struct rgw_http_error r;
       int err_no;
 
       s->formatter->open_object_section("Error");
@@ -2712,9 +2793,118 @@ void RGWGetObjLayout_ObjStore_S3::send_response()
   rgw_flush_formatter(s, &f);
 }
 
-RGWOp *RGWHandler_REST_Service_S3::op_get()
+int RGWConfigBucketMetaSearch_ObjStore_S3::get_params()
 {
-  if (is_usage_op()) {
+  auto iter = s->info.x_meta_map.find("x-amz-meta-search");
+  if (iter == s->info.x_meta_map.end()) {
+    s->err.message = "X-Rgw-Meta-Search header not provided";
+    ldout(s->cct, 5) << s->err.message << dendl;
+    return -EINVAL;
+  }
+
+  list<string> expressions;
+  get_str_list(iter->second, ",", expressions);
+
+  for (auto& expression : expressions) {
+    vector<string> args;
+    get_str_vec(expression, ";", args);
+
+    if (args.empty()) {
+      s->err.message = "invalid empty expression";
+      ldout(s->cct, 5) << s->err.message << dendl;
+      return -EINVAL;
+    }
+    if (args.size() > 2) {
+      s->err.message = string("invalid expression: ") + expression;
+      ldout(s->cct, 5) << s->err.message << dendl;
+      return -EINVAL;
+    }
+
+    string key = boost::algorithm::to_lower_copy(rgw_trim_whitespace(args[0]));
+    string val;
+    if (args.size() > 1) {
+      val = boost::algorithm::to_lower_copy(rgw_trim_whitespace(args[1]));
+    }
+
+    if (!boost::algorithm::starts_with(key, RGW_AMZ_META_PREFIX)) {
+      s->err.message = string("invalid expression, key must start with '" RGW_AMZ_META_PREFIX "' : ") + expression;
+      ldout(s->cct, 5) << s->err.message << dendl;
+      return -EINVAL;
+    }
+
+    key = key.substr(sizeof(RGW_AMZ_META_PREFIX) - 1);
+
+    ESEntityTypeMap::EntityType entity_type;
+
+    if (val.empty() || val == "str" || val == "string") {
+      entity_type = ESEntityTypeMap::ES_ENTITY_STR;
+    } else if (val == "int" || val == "integer") {
+      entity_type = ESEntityTypeMap::ES_ENTITY_INT;
+    } else if (val == "date" || val == "datetime") {
+      entity_type = ESEntityTypeMap::ES_ENTITY_DATE;
+    } else {
+      s->err.message = string("invalid entity type: ") + val;
+      ldout(s->cct, 5) << s->err.message << dendl;
+      return -EINVAL;
+    }
+
+    mdsearch_config[key] = entity_type;
+  }
+
+  return 0;
+}
+
+void RGWConfigBucketMetaSearch_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this);
+}
+
+void RGWGetBucketMetaSearch_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, NULL, "application/xml");
+
+  Formatter *f = s->formatter;
+  f->open_array_section("GetBucketMetaSearchResult");
+  for (auto& e : s->bucket_info.mdsearch_config) {
+    f->open_object_section("Entry");
+    string k = string("x-amz-meta-") + e.first;
+    f->dump_string("Key", k.c_str());
+    const char *type;
+    switch (e.second) {
+      case ESEntityTypeMap::ES_ENTITY_INT:
+        type = "int";
+        break;
+      case ESEntityTypeMap::ES_ENTITY_DATE:
+        type = "date";
+        break;
+      default:
+        type = "str";
+    }
+    f->dump_string("Type", type);
+    f->close_section();
+  }
+  f->close_section();
+  rgw_flush_formatter(s, f);
+}
+
+void RGWDelBucketMetaSearch_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this);
+}
+
+
+RGWOp *RGWHandler_REST_Service_S3::op_get()
+{
+  if (is_usage_op()) {
     return new RGWGetUsage_ObjStore_S3;
   } else {
     return new RGWListBuckets_ObjStore_S3;
@@ -2755,10 +2945,11 @@ RGWOp *RGWHandler_REST_Service_S3::op_post()
 RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
 {
   // Non-website mode
-  if (get_data)
+  if (get_data) {
     return new RGWListBucket_ObjStore_S3;
-  else
+  } else {
     return new RGWStatBucket_ObjStore_S3;
+  }
 }
 
 RGWOp *RGWHandler_REST_Bucket_S3::op_get()
@@ -2779,6 +2970,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get()
     return new RGWGetBucketWebsite_ObjStore_S3;
   }
 
+  if (s->info.args.exists("mdsearch")) {
+    return new RGWGetBucketMetaSearch_ObjStore_S3;
+  }
+
   if (is_acl_op()) {
     return new RGWGetACLs_ObjStore_S3;
   } else if (is_cors_op()) {
@@ -2789,6 +2984,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get()
     return new RGWListBucketMultiparts_ObjStore_S3;
   } else if(is_lc_op()) {
     return new RGWGetLC_ObjStore_S3;
+  } else if(is_policy_op()) {
+    return new RGWGetBucketPolicy;
   }
   return get_obj_op(true);
 }
@@ -2823,6 +3020,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put()
     return new RGWSetRequestPayment_ObjStore_S3;
   } else if(is_lc_op()) {
     return new RGWPutLC_ObjStore_S3;
+  } else if(is_policy_op()) {
+    return new RGWPutBucketPolicy;
   }
   return new RGWCreateBucket_ObjStore_S3;
 }
@@ -2833,6 +3032,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete()
     return new RGWDeleteCORS_ObjStore_S3;
   } else if(is_lc_op()) {
     return new RGWDeleteLC_ObjStore_S3;
+  } else if(is_policy_op()) {
+    return new RGWDeleteBucketPolicy;
   }
 
   if (s->info.args.sub_resource_exists("website")) {
@@ -2842,6 +3043,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete()
     return new RGWDeleteBucketWebsite_ObjStore_S3;
   }
 
+  if (s->info.args.exists("mdsearch")) {
+    return new RGWDelBucketMetaSearch_ObjStore_S3;
+  }
+
   return new RGWDeleteBucket_ObjStore_S3;
 }
 
@@ -2851,6 +3056,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_post()
     return new RGWDeleteMultiObj_ObjStore_S3;
   }
 
+  if (s->info.args.exists("mdsearch")) {
+    return new RGWConfigBucketMetaSearch_ObjStore_S3;
+  }
+
   return new RGWPostObj_ObjStore_S3;
 }
 
@@ -2877,6 +3086,8 @@ RGWOp *RGWHandler_REST_Obj_S3::op_get()
     return new RGWListMultipart_ObjStore_S3;
   } else if (s->info.args.exists("layout")) {
     return new RGWGetObjLayout_ObjStore_S3;
+  } else if (is_tagging_op()) {
+    return new RGWGetObjTags_ObjStore_S3;
   }
   return get_obj_op(true);
 }
@@ -2895,7 +3106,10 @@ RGWOp *RGWHandler_REST_Obj_S3::op_put()
 {
   if (is_acl_op()) {
     return new RGWPutACLs_ObjStore_S3;
+  } else if (is_tagging_op()) {
+    return new RGWPutObjTags_ObjStore_S3;
   }
+
   if (s->init_state.src_bucket.empty())
     return new RGWPutObj_ObjStore_S3;
   else
@@ -2904,6 +3118,9 @@ RGWOp *RGWHandler_REST_Obj_S3::op_put()
 
 RGWOp *RGWHandler_REST_Obj_S3::op_delete()
 {
+  if (is_tagging_op()) {
+    return new RGWDeleteObjTags_ObjStore_S3;
+  }
   string upload_id = s->info.args.get("uploadId");
 
   if (upload_id.empty())
@@ -2920,7 +3137,7 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post()
   if (s->info.args.exists("uploads"))
     return new RGWInitMultipart_ObjStore_S3;
 
-  return NULL;
+  return new RGWPostObj_ObjStore_S3;
 }
 
 RGWOp *RGWHandler_REST_Obj_S3::op_options()
@@ -2987,799 +3204,168 @@ int RGWHandler_REST_S3::init_from_header(struct req_state* s,
     }
   } else {
     s->object = rgw_obj_key(req_name, s->info.args.get("versionId"));
-  }
-  return 0;
-}
-
-int RGWHandler_REST_S3::postauth_init()
-{
-  struct req_init_state *t = &s->init_state;
-  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
-
-  rgw_parse_url_bucket(t->url_bucket, s->user->user_id.tenant,
-                     s->bucket_tenant, s->bucket_name);
-
-  dout(10) << "s->object=" << (!s->object.empty() ? s->object : rgw_obj_key("<NULL>"))
-           << " s->bucket=" << rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name) << dendl;
-
-  int ret;
-  ret = validate_tenant_name(s->bucket_tenant);
-  if (ret)
-    return ret;
-  if (!s->bucket_name.empty()) {
-    ret = valid_s3_bucket_name(s->bucket_name, relaxed_names);
-    if (ret)
-      return ret;
-    ret = validate_object_name(s->object.name);
-    if (ret)
-      return ret;
-  }
-
-  if (!t->src_bucket.empty()) {
-    rgw_parse_url_bucket(t->src_bucket, s->user->user_id.tenant,
-                       s->src_tenant_name, s->src_bucket_name);
-    ret = validate_tenant_name(s->src_tenant_name);
-    if (ret)
-      return ret;
-    ret = valid_s3_bucket_name(s->src_bucket_name, relaxed_names);
-    if (ret)
-      return ret;
-  }
-  return 0;
-}
-
-int RGWHandler_REST_S3::init(RGWRados *store, struct req_state *s,
-                             rgw::io::BasicClient *cio)
-{
-  int ret;
-
-  s->dialect = "s3";
-  
-  ret = validate_tenant_name(s->bucket_tenant);
-  if (ret)
-    return ret;
-  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
-  if (!s->bucket_name.empty()) {
-    ret = valid_s3_bucket_name(s->bucket_name, relaxed_names);
-    if (ret)
-      return ret;
-    ret = validate_object_name(s->object.name);
-    if (ret)
-      return ret;
-  }
-
-  const char *cacl = s->info.env->get("HTTP_X_AMZ_ACL");
-  if (cacl)
-    s->canned_acl = cacl;
-
-  s->has_acl_header = s->info.env->exists_prefix("HTTP_X_AMZ_GRANT");
-
-  const char *copy_source = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE");
-
-  if (copy_source && !s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE")) {
-    ret = RGWCopyObj::parse_copy_location(copy_source,
-                                          s->init_state.src_bucket,
-                                          s->src_object);
-    if (!ret) {
-      ldout(s->cct, 0) << "failed to parse copy location" << dendl;
-      return -EINVAL; // XXX why not -ERR_INVALID_BUCKET_NAME or -ERR_BAD_URL?
-    }
-  }
-
-  return RGWHandler_REST::init(store, s, cio);
-}
-
-static void init_anon_user(struct req_state *s)
-{
-  rgw_get_anon_user(*(s->user));
-  s->perm_mask = RGW_PERM_FULL_CONTROL;
-}
-
-/*
- * verify that a signed request comes from the keyholder
- * by checking the signature against our locally-computed version
- *
- * it tries AWS v4 before AWS v2
- */
-int RGW_Auth_S3::authorize(RGWRados* const store,
-                           const rgw::auth::StrategyRegistry& auth_registry,
-                           struct req_state* const s)
-{
-
-  /* neither keystone and rados enabled; warn and exit! */
-  if (!store->ctx()->_conf->rgw_s3_auth_use_rados &&
-      !store->ctx()->_conf->rgw_s3_auth_use_keystone &&
-      !store->ctx()->_conf->rgw_s3_auth_use_ldap) {
-    dout(0) << "WARNING: no authorization backend enabled! Users will never authenticate." << dendl;
-    return -EPERM;
-  }
-
-  if (s->op == OP_OPTIONS) {
-    init_anon_user(s);
-    return 0;
-  }
-
-  if (!s->http_auth || !(*s->http_auth)) {
-
-    /* AWS4 */
-
-    string algorithm = s->info.args.get("X-Amz-Algorithm");
-    if (algorithm.size()) {
-      if (algorithm != "AWS4-HMAC-SHA256") {
-        return -EPERM;
-      }
-      /* compute first aws4 signature (stick to the boto2 implementation) */
-      int err = authorize_v4(store, s);
-      if ((err==-ERR_SIGNATURE_NO_MATCH) && !store->ctx()->_conf->rgw_s3_auth_aws4_force_boto2_compat) {
-        /* compute second aws4 signature (no bugs supported) */
-        ldout(s->cct, 10) << "computing second aws4 signature..." << dendl;
-        return authorize_v4(store, s, false);
-      }
-      return err;
-    }
-
-    /* AWS2 */
-
-    string auth_id = s->info.args.get("AWSAccessKeyId");
-    if (auth_id.size()) {
-      return authorize_v2(store, auth_registry, s);
-    }
-
-    /* anonymous access */
-
-    init_anon_user(s);
-    return 0;
-
-  } else {
-    /* Authorization in Header */
-
-    /* AWS4 */
-
-    if (!strncmp(s->http_auth, "AWS4-HMAC-SHA256", 16)) {
-      return authorize_v4(store, s);
-    }
-
-    /* AWS2 */
-
-    if (!strncmp(s->http_auth, "AWS ", 4)) {
-      return authorize_v2(store, auth_registry, s);
-    }
-
-  }
-
-  return -EINVAL;
-}
-
-int RGW_Auth_S3::authorize_aws4_auth_complete(RGWRados *store, struct req_state *s)
-{
-  return authorize_v4_complete(store, s, "", false);
-}
-
-int RGW_Auth_S3::authorize_v4_complete(RGWRados *store, struct req_state *s, const string& request_payload, bool unsigned_payload)
-{
-  size_t pos;
-
-  /* craft canonical request */
-
-  string canonical_req;
-  string canonical_req_hash;
-
-  rgw_create_s3_v4_canonical_request(s, s->aws4_auth->canonical_uri, s->aws4_auth->canonical_qs,
-      s->aws4_auth->canonical_hdrs, s->aws4_auth->signed_hdrs, request_payload, unsigned_payload,
-      canonical_req, canonical_req_hash);
-
-  /* Validate x-amz-sha256 */
-
-  if (s->aws4_auth_needs_complete) {
-    const char *expected_request_payload_hash = s->info.env->get("HTTP_X_AMZ_CONTENT_SHA256");
-    if (expected_request_payload_hash &&
-        s->aws4_auth->payload_hash.compare(expected_request_payload_hash) != 0) {
-      ldout(s->cct, 10) << "ERROR: x-amz-content-sha256 does not match" << dendl;
-      return -ERR_AMZ_CONTENT_SHA256_MISMATCH;
-    }
-  }
-
-  /*
-   * create a string to sign
-   *
-   * http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
-   */
-
-  string string_to_sign;
-
-  rgw_create_s3_v4_string_to_sign(s->cct, "AWS4-HMAC-SHA256", s->aws4_auth->date, s->aws4_auth->credential_scope,
-      canonical_req_hash, string_to_sign);
-
-  /*
-   * calculate the AWS signature
-   *
-   * http://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
-   */
-
-  string cs_aux = s->aws4_auth->credential_scope;
-
-  string date_cs = cs_aux;
-  pos = date_cs.find("/");
-  date_cs = date_cs.substr(0, pos);
-  cs_aux = cs_aux.substr(pos + 1, cs_aux.length());
-
-  string region_cs = cs_aux;
-  pos = region_cs.find("/");
-  region_cs = region_cs.substr(0, pos);
-  cs_aux = cs_aux.substr(pos + 1, cs_aux.length());
-
-  string service_cs = cs_aux;
-  pos = service_cs.find("/");
-  service_cs = service_cs.substr(0, pos);
-
-  int err = rgw_calculate_s3_v4_aws_signature(s, s->aws4_auth->access_key_id, date_cs,
-      region_cs, service_cs, string_to_sign, s->aws4_auth->new_signature);
-
-  ldout(s->cct, 10) << "----------------------------- Verifying signatures" << dendl;
-  ldout(s->cct, 10) << "Signature     = " << s->aws4_auth->signature << dendl;
-  ldout(s->cct, 10) << "New Signature = " << s->aws4_auth->new_signature << dendl;
-  ldout(s->cct, 10) << "-----------------------------" << dendl;
-
-  if (err) {
-    return err;
-  }
-
-  s->aws4_auth->seed_signature = s->aws4_auth->new_signature;
-
-  return 0;
-
-}
-
-static inline bool is_base64_for_content_md5(unsigned char c) {
-  return (isalnum(c) || isspace(c) || (c == '+') || (c == '/') || (c == '='));
-}
-
-static bool char_needs_aws4_escaping(char c)
-{
-  if ((c >= 'a' && c <= 'z') ||
-      (c >= 'A' && c <= 'Z') ||
-      (c >= '0' && c <= '9')) {
-    return false;
-  }
-
-  switch (c) {
-    case '-':
-    case '_':
-    case '.':
-    case '~':
-      return false;
-  }
-  return true;
-}
-
-static void aws4_uri_encode(const string& src, string& dst)
-{
-  const char *p = src.c_str();
-  for (unsigned i = 0; i < src.size(); i++, p++) {
-    if (char_needs_aws4_escaping(*p)) {
-      rgw_uri_escape_char(*p, dst);
-      continue;
-    }
-
-    dst.append(p, 1);
-  }
-}
-
-static std::array<string, 3> aws4_presigned_required_keys = { "Credential", "SignedHeaders", "Signature" };
-
-/*
- * handle v4 signatures (rados auth only)
- */
-int RGW_Auth_S3::authorize_v4(RGWRados *store, struct req_state *s, bool force_boto2_compat /* = true */)
-{
-  string::size_type pos;
-  bool using_qs;
-  /* used for pre-signatured url, We shouldn't return -ERR_REQUEST_TIME_SKEWED when 
-     current time <= X-Amz-Expires */
-  bool qsr = false;
-
-  uint64_t now_req = 0;
-  uint64_t now = ceph_clock_now();
-
-  /* v4 requires rados auth */
-  if (!store->ctx()->_conf->rgw_s3_auth_use_rados) {
-    return -EPERM;
-  }
-
-  try {
-    s->aws4_auth = std::unique_ptr<rgw_aws4_auth>(new rgw_aws4_auth);
-  } catch (std::bad_alloc&) {
-    return -ENOMEM;
-  }
-
-  if ((!s->http_auth) || !(*s->http_auth)) {
-
-    /* auth ships with req params ... */
-
-    /* look for required params */
-
-    using_qs = true;
-    s->aws4_auth->credential = s->info.args.get("X-Amz-Credential");
-    if (s->aws4_auth->credential.size() == 0) {
-      return -EPERM;
-    }
-
-    s->aws4_auth->date = s->info.args.get("X-Amz-Date");
-    struct tm date_t;
-    if (!parse_iso8601(s->aws4_auth->date.c_str(), &date_t, NULL, false))
-      return -EPERM;
-
-    s->aws4_auth->expires = s->info.args.get("X-Amz-Expires");
-    if (!s->aws4_auth->expires.empty()) {
-      /* X-Amz-Expires provides the time period, in seconds, for which
-         the generated presigned URL is valid. The minimum value
-         you can set is 1, and the maximum is 604800 (seven days) */
-      time_t exp = atoll(s->aws4_auth->expires.c_str());
-      if ((exp < 1) || (exp > 7*24*60*60)) {
-        dout(10) << "NOTICE: exp out of range, exp = " << exp << dendl;
-        return -EPERM;
-      }
-      /* handle expiration in epoch time */
-      now_req = (uint64_t)internal_timegm(&date_t);
-      if (now >= now_req + exp) {
-        dout(10) << "NOTICE: now = " << now << ", now_req = " << now_req << ", exp = " << exp << dendl;
-        return -EPERM;
-      }
-      qsr = true;
-    }
-
-    if ((now_req < now - RGW_AUTH_GRACE_MINS * 60 ||
-       now_req > now + RGW_AUTH_GRACE_MINS * 60) && !qsr) {
-      dout(10) << "NOTICE: request time skew too big." << dendl;
-      dout(10) << "now_req = " << now_req << " now = " << now
-               << "; now - RGW_AUTH_GRACE_MINS=" 
-               << now - RGW_AUTH_GRACE_MINS * 60
-               << "; now + RGW_AUTH_GRACE_MINS="
-               << now + RGW_AUTH_GRACE_MINS * 60 << dendl;
-      return -ERR_REQUEST_TIME_SKEWED;
-    }
-
-    s->aws4_auth->signedheaders = s->info.args.get("X-Amz-SignedHeaders");
-    if (s->aws4_auth->signedheaders.size() == 0) {
-      return -EPERM;
-    }
-
-    s->aws4_auth->signature = s->info.args.get("X-Amz-Signature");
-    if (s->aws4_auth->signature.size() == 0) {
-      return -EPERM;
-    }
-
-  } else {
-
-    /* auth ships in headers ... */
-
-    /* ------------------------- handle Credential header */
-
-    using_qs = false;
-
-    string auth_str = s->http_auth;
-
-#define AWS4_HMAC_SHA256_STR "AWS4-HMAC-SHA256"
-#define CREDENTIALS_PREFIX_LEN (sizeof(AWS4_HMAC_SHA256_STR) - 1)
-    uint64_t min_len = CREDENTIALS_PREFIX_LEN + 1;
-    if (auth_str.length() < min_len) {
-      ldout(store->ctx(), 10) << "credentials string is too short" << dendl;
-      return -EINVAL;
-    }
-
-    list<string> auth_list;
-    get_str_list(auth_str.substr(min_len), ",", auth_list);
-
-    map<string, string> kv;
-
-    for (string& s : auth_list) {
-      string key, val;
-      int ret = parse_key_value(s, key, val);
-      if (ret < 0) {
-        ldout(store->ctx(), 10) << "NOTICE: failed to parse auth header (s=" << s << ")" << dendl;
-        return -EINVAL;
-      }
-      kv[key] = std::move(val);
-    }
-
-    for (string& k : aws4_presigned_required_keys) {
-      if (kv.find(k) == kv.end()) {
-        ldout(store->ctx(), 10) << "NOTICE: auth header missing key: " << k << dendl;
-        return -EINVAL;
-      }
-    }
-
-    s->aws4_auth->credential = std::move(kv["Credential"]);
-    s->aws4_auth->signedheaders = std::move(kv["SignedHeaders"]);
-    s->aws4_auth->signature = std::move(kv["Signature"]);
-
-    /* sig hex str */
-    dout(10) << "v4 signature format = " << s->aws4_auth->signature << dendl;
-
-    /* ------------------------- handle x-amz-date header */
-
-    /* grab date */
-
-    const char *d = s->info.env->get("HTTP_X_AMZ_DATE");
-    struct tm t;
-    if (!parse_iso8601(d, &t, NULL, false)) {
-      dout(10) << "error reading date via http_x_amz_date" << dendl;
-      return -EACCES;
-    }
-    s->aws4_auth->date = d;
-  }
-
-  /* AKIAIVKTAZLOCF43WNQD/AAAAMMDD/region/host/aws4_request */
-  dout(10) << "v4 credential format = " << s->aws4_auth->credential << dendl;
-
-  if (std::count(s->aws4_auth->credential.begin(), s->aws4_auth->credential.end(), '/') != 4) {
-    return -EINVAL;
-  }
-
-  /* credential must end with 'aws4_request' */
-  if (s->aws4_auth->credential.find("aws4_request") == std::string::npos) {
-    return -EINVAL;
-  }
-
-  /* grab access key id */
-
-  pos = s->aws4_auth->credential.find("/");
-  s->aws4_auth->access_key_id = s->aws4_auth->credential.substr(0, pos);
-
-  dout(10) << "access key id = " << s->aws4_auth->access_key_id << dendl;
-
-  /* grab credential scope */
-
-  s->aws4_auth->credential_scope = s->aws4_auth->credential.substr(pos + 1, s->aws4_auth->credential.length());
-
-  dout(10) << "credential scope = " << s->aws4_auth->credential_scope << dendl;
-
-  /* grab user information */
-
-  if (rgw_get_user_info_by_access_key(store, s->aws4_auth->access_key_id, *s->user) < 0) {
-    dout(10) << "error reading user info, uid=" << s->aws4_auth->access_key_id
-              << " can't authenticate" << dendl;
-    return -ERR_INVALID_ACCESS_KEY;
-  }
-
-  /*
-   * create a canonical request
-   *
-   * http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
-   */
-
-  /* craft canonical uri */
-
-  /* here code should normalize via rfc3986 but S3 does **NOT** do path normalization
-   * that SigV4 typically does. this code follows the same approach that boto library
-   * see auth.py:canonical_uri(...) */
-
-  s->aws4_auth->canonical_uri = s->info.request_uri_aws4;
-
-  if (s->aws4_auth->canonical_uri.empty()) {
-    s->aws4_auth->canonical_uri = "/";
-  } else {
-    boost::replace_all(s->aws4_auth->canonical_uri, "+", "%20");
-  }
-
-  /* craft canonical query string */
-
-  s->aws4_auth->canonical_qs = s->info.request_params;
-
-  if (!s->aws4_auth->canonical_qs.empty()) {
-
-    /* handle case when query string exists. Step 3 in
-     * http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html */
-
-    map<string, string> canonical_qs_map;
-    istringstream cqs(s->aws4_auth->canonical_qs);
-    string keyval;
-
-    while (getline(cqs, keyval, '&')) {
-      string key, val;
-      istringstream kv(keyval);
-      getline(kv, key, '=');
-      getline(kv, val, '=');
-      if (!using_qs || key != "X-Amz-Signature") {
-        string encoded_key;
-        string encoded_val;
-        if (key != "X-Amz-Credential") {
-          string key_decoded;
-          url_decode(key, key_decoded);
-          if (key.length() != key_decoded.length()) {
-            encoded_key = key;
-          } else {
-            aws4_uri_encode(key, encoded_key);
-          }
-          string val_decoded;
-          url_decode(val, val_decoded);
-          if (val.length() != val_decoded.length()) {
-            encoded_val = val;
-          } else {
-            aws4_uri_encode(val, encoded_val);
-          }
-        } else {
-          encoded_key = key;
-          encoded_val = val;
-        }
-        canonical_qs_map[encoded_key] = encoded_val;
-      }
-    }
-
-    s->aws4_auth->canonical_qs = "";
-
-    map<string, string>::iterator last = canonical_qs_map.end();
-    --last;
-
-    for (map<string, string>::iterator it = canonical_qs_map.begin();
-        it != canonical_qs_map.end(); ++it) {
-      s->aws4_auth->canonical_qs.append(it->first + "=" + it->second);
-      if (it != last) {
-        s->aws4_auth->canonical_qs.append("&");
-      }
-    }
-
-  }
-
-  /* craft canonical headers */
-
-  map<string, string> canonical_hdrs_map;
-  istringstream sh(s->aws4_auth->signedheaders);
-  string token;
-  string port = s->info.env->get("SERVER_PORT", "");
-  string secure_port = s->info.env->get("SERVER_PORT_SECURE", "");
-
-  while (getline(sh, token, ';')) {
-    string token_env = "HTTP_" + token;
-    transform(token_env.begin(), token_env.end(), token_env.begin(), ::toupper);
-    replace(token_env.begin(), token_env.end(), '-', '_');
-    if (token_env == "HTTP_CONTENT_LENGTH") {
-      token_env = "CONTENT_LENGTH";
-    }
-    if (token_env == "HTTP_CONTENT_TYPE") {
-      token_env = "CONTENT_TYPE";
-    }
-    const char *t = s->info.env->get(token_env.c_str());
-    if (!t) {
-      dout(10) << "warning env var not available" << dendl;
-      continue;
-    }
-    if (token_env == "HTTP_CONTENT_MD5") {
-      for (const char *p = t; *p; p++) {
-       if (!is_base64_for_content_md5(*p)) {
-         dout(0) << "NOTICE: bad content-md5 provided (not base64), aborting request p=" << *p << " " << (int)*p << dendl;
-         return -EPERM;
-       }
-      }
-    }
-    string token_value = string(t);
-    if (force_boto2_compat && using_qs && (token == "host")) {
-      if (!secure_port.empty()) {
-       if (secure_port != "443")
-         token_value = token_value + ":" + secure_port;
-      } else if (!port.empty()) {
-       if (port != "80")
-         token_value = token_value + ":" + port;
-      }
-    }
-    canonical_hdrs_map[token] = rgw_trim_whitespace(token_value);
-  }
-
-  for (map<string, string>::iterator it = canonical_hdrs_map.begin();
-      it != canonical_hdrs_map.end(); ++it) {
-    s->aws4_auth->canonical_hdrs.append(it->first + ":" + it->second + "\n");
-  }
-
-  dout(10) << "canonical headers format = " << s->aws4_auth->canonical_hdrs << dendl;
-
-  /* craft signed headers */
-
-  s->aws4_auth->signed_hdrs = s->aws4_auth->signedheaders;
-
-  /* handle request payload */
-
-  s->aws4_auth->payload_hash = "";
-
-  string request_payload;
-
-  bool unsigned_payload = false;
-  s->aws4_auth_streaming_mode = false;
-
-  if (using_qs) {
-    /* query parameters auth */
-    unsigned_payload = true;
-  } else {
-    /* header auth */
-    const char *request_payload_hash = s->info.env->get("HTTP_X_AMZ_CONTENT_SHA256");
-    if (request_payload_hash) {
-      unsigned_payload = string("UNSIGNED-PAYLOAD").compare(request_payload_hash) == 0;
-      if (!unsigned_payload) {
-        s->aws4_auth_streaming_mode = string("STREAMING-AWS4-HMAC-SHA256-PAYLOAD").compare(request_payload_hash) == 0;
-      }
-    }
-  }
-
-  /* from rfc2616 - 4.3 Message Body
-   *
-   * "The presence of a message-body in a request is signaled by the inclusion of a
-   *  Content-Length or Transfer-Encoding header field in the request's message-headers."
-   */
-  bool body_available = s->content_length != 0 || s->info.env->get("HTTP_TRANSFER_ENCODING") != NULL;
-
-  if (unsigned_payload || !body_available) {
-
-    /* requests lacking of body or shipping with 'UNSIGNED-PAYLOAD' are authenticated now */
-
-    /* complete aws4 auth */
-
-    int err = authorize_v4_complete(store, s, request_payload, unsigned_payload);
-    if (err) {
-      return err;
-    }
-
-    /* verify signature */
-
-    if (s->aws4_auth->signature != s->aws4_auth->new_signature) {
-      return -ERR_SIGNATURE_NO_MATCH;
-    }
-
-    /* authorization ok */
-
-    dout(10) << "v4 auth ok" << dendl;
-
-    /* aws4 auth completed */
-
-    s->aws4_auth_needs_complete = false;
-
-  } else {
-
-    /* aws4 auth not completed... delay aws4 auth */
-
-    if (!s->aws4_auth_streaming_mode) {
-
-      dout(10) << "delaying v4 auth" << dendl;
+  }
+  return 0;
+}
 
-      /* payload in a single chunk */
+int RGWHandler_REST_S3::postauth_init()
+{
+  struct req_init_state *t = &s->init_state;
+  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
 
-      switch (s->op_type)
-      {
-        case RGW_OP_CREATE_BUCKET:
-        case RGW_OP_PUT_OBJ:
-        case RGW_OP_PUT_ACLS:
-        case RGW_OP_PUT_CORS:
-        case RGW_OP_COMPLETE_MULTIPART:
-        case RGW_OP_SET_BUCKET_VERSIONING:
-        case RGW_OP_DELETE_MULTI_OBJ:
-        case RGW_OP_ADMIN_SET_METADATA:
-        case RGW_OP_SET_BUCKET_WEBSITE:
-          break;
-        default:
-          dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
-          return -ERR_NOT_IMPLEMENTED;
-      }
+  rgw_parse_url_bucket(t->url_bucket, s->user->user_id.tenant,
+                     s->bucket_tenant, s->bucket_name);
 
-      s->aws4_auth_needs_complete = true;
+  dout(10) << "s->object=" << (!s->object.empty() ? s->object : rgw_obj_key("<NULL>"))
+           << " s->bucket=" << rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name) << dendl;
 
-    } else {
+  int ret;
+  ret = rgw_validate_tenant_name(s->bucket_tenant);
+  if (ret)
+    return ret;
+  if (!s->bucket_name.empty()) {
+    ret = valid_s3_bucket_name(s->bucket_name, relaxed_names);
+    if (ret)
+      return ret;
+    ret = validate_object_name(s->object.name);
+    if (ret)
+      return ret;
+  }
 
-      dout(10) << "body content detected in multiple chunks" << dendl;
+  if (!t->src_bucket.empty()) {
+    rgw_parse_url_bucket(t->src_bucket, s->user->user_id.tenant,
+                       s->src_tenant_name, s->src_bucket_name);
+    ret = rgw_validate_tenant_name(s->src_tenant_name);
+    if (ret)
+      return ret;
+    ret = valid_s3_bucket_name(s->src_bucket_name, relaxed_names);
+    if (ret)
+      return ret;
+  }
+  return 0;
+}
 
-      /* payload in multiple chunks */
+int RGWHandler_REST_S3::init(RGWRados *store, struct req_state *s,
+                             rgw::io::BasicClient *cio)
+{
+  int ret;
 
-      switch(s->op_type)
-      {
-        case RGW_OP_PUT_OBJ:
-          break;
-        default:
-          dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED (streaming mode)" << dendl;
-          return -ERR_NOT_IMPLEMENTED;
-      }
+  s->dialect = "s3";
 
-      /* calculate seed */
+  ret = rgw_validate_tenant_name(s->bucket_tenant);
+  if (ret)
+    return ret;
+  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
+  if (!s->bucket_name.empty()) {
+    ret = valid_s3_bucket_name(s->bucket_name, relaxed_names);
+    if (ret)
+      return ret;
+    ret = validate_object_name(s->object.name);
+    if (ret)
+      return ret;
+  }
 
-      int err = authorize_v4_complete(store, s, "", unsigned_payload);
-      if (err) {
-        return err;
-      }
+  const char *cacl = s->info.env->get("HTTP_X_AMZ_ACL");
+  if (cacl)
+    s->canned_acl = cacl;
 
-      /* verify seed signature */
+  s->has_acl_header = s->info.env->exists_prefix("HTTP_X_AMZ_GRANT");
 
-      if (s->aws4_auth->signature != s->aws4_auth->new_signature) {
-        dout(10) << "ERROR: AWS4 seed signature does NOT match!" << dendl;
-        return -ERR_SIGNATURE_NO_MATCH;
-      }
+  const char *copy_source = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE");
 
-      dout(10) << "aws4 seed signature ok... delaying v4 auth" << dendl;
+  if (copy_source && !s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE")) {
+    ret = RGWCopyObj::parse_copy_location(copy_source,
+                                          s->init_state.src_bucket,
+                                          s->src_object);
+    if (!ret) {
+      ldout(s->cct, 0) << "failed to parse copy location" << dendl;
+      return -EINVAL; // XXX why not -ERR_INVALID_BUCKET_NAME or -ERR_BAD_URL?
+    }
+  }
 
-      s->aws4_auth_needs_complete = false;
+  return RGWHandler_REST::init(store, s, cio);
+}
 
-    }
+enum class AwsVersion {
+  UNKOWN,
+  V2,
+  V4
+};
 
-  }
+enum class AwsRoute {
+  UNKOWN,
+  QUERY_STRING,
+  HEADERS
+};
 
-  map<string, RGWAccessKey>::iterator iter = s->user->access_keys.find(s->aws4_auth->access_key_id);
-  if (iter == s->user->access_keys.end()) {
-    dout(0) << "ERROR: access key not encoded in user info" << dendl;
-    return -EPERM;
-  }
+static inline std::pair<AwsVersion, AwsRoute>
+discover_aws_flavour(const req_info& info)
+{
+  using rgw::auth::s3::AWS4_HMAC_SHA256_STR;
 
-  RGWAccessKey& k = iter->second;
+  AwsVersion version = AwsVersion::UNKOWN;
+  AwsRoute route = AwsRoute::UNKOWN;
 
-  if (!k.subuser.empty()) {
-    map<string, RGWSubUser>::iterator uiter = s->user->subusers.find(k.subuser);
-    if (uiter == s->user->subusers.end()) {
-      dout(0) << "NOTICE: could not find subuser: " << k.subuser << dendl;
-      return -EPERM;
+  const char* http_auth = info.env->get("HTTP_AUTHORIZATION");
+  if (http_auth && http_auth[0]) {
+    /* Authorization in Header */
+    route = AwsRoute::HEADERS;
+
+    if (!strncmp(http_auth, AWS4_HMAC_SHA256_STR,
+                 strlen(AWS4_HMAC_SHA256_STR))) {
+      /* AWS v4 */
+      version = AwsVersion::V4;
+    } else if (!strncmp(http_auth, "AWS ", 4)) {
+      /* AWS v2 */
+      version = AwsVersion::V2;
     }
-    RGWSubUser& subuser = uiter->second;
-    s->perm_mask = subuser.perm_mask;
   } else {
-    s->perm_mask = RGW_PERM_FULL_CONTROL;
-  }
-
-  if (s->user->system) {
-    s->system_request = true;
-    dout(20) << "system request" << dendl;
-    s->info.args.set_system();
-    string euid = s->info.args.get(RGW_SYS_PARAM_PREFIX "uid");
-    rgw_user effective_uid(euid);
-    RGWUserInfo effective_user;
-    if (!effective_uid.empty()) {
-      int ret = rgw_get_user_info_by_uid(store, effective_uid, effective_user);
-      if (ret < 0) {
-        ldout(s->cct, 0) << "User lookup failed!" << dendl;
-        return -EACCES;
-      }
-      *(s->user) = effective_user;
+    route = AwsRoute::QUERY_STRING;
+
+    if (info.args.get("X-Amz-Algorithm") == AWS4_HMAC_SHA256_STR) {
+      /* AWS v4 */
+      version = AwsVersion::V4;
+    } else if (!info.args.get("AWSAccessKeyId").empty()) {
+      /* AWS v2 */
+      version = AwsVersion::V2;
     }
   }
 
-  // populate the owner info
-  s->owner.set_id(s->user->user_id);
-  s->owner.set_name(s->user->display_name);
+  return std::make_pair(version, route);
+}
 
-  return 0;
+static void init_anon_user(struct req_state *s)
+{
+  rgw_get_anon_user(*(s->user));
+  s->perm_mask = RGW_PERM_FULL_CONTROL;
 }
 
 /*
- * handle v2 signatures
+ * verify that a signed request comes from the keyholder
+ * by checking the signature against our locally-computed version
+ *
+ * it tries AWS v4 before AWS v2
  */
-int RGW_Auth_S3::authorize_v2(RGWRados* const store,
-                              const rgw::auth::StrategyRegistry& auth_registry,
-                              struct req_state* const s)
+int RGW_Auth_S3::authorize(RGWRados* const store,
+                           const rgw::auth::StrategyRegistry& auth_registry,
+                           struct req_state* const s)
 {
-  const auto& auth_strategy = auth_registry.get_s3_main();
-  try {
-    auto result = auth_strategy.authenticate(s);
-    if (result.get_status() != decltype(result)::Status::GRANTED) {
-      ldout(s->cct, 5) << "Failed the S3 auth strategy, reason="
-                       << result.get_reason() << dendl;
-      return result.get_reason();
-    }
-    try {
-      auto applier = result.get_applier();
-
-      applier->load_acct_info(*s->user);
-      s->perm_mask = applier->get_perm_mask();
-      applier->modify_request_state(s);
-      s->auth.identity = std::move(applier);
 
-      /* Populate the owner info. */
-      s->owner.set_id(s->user->user_id);
-      s->owner.set_name(s->user->display_name);
-
-      /* Success - not throwed. */
-      return 0;
-    } catch (const int err) {
-      ldout(s->cct, 5) << "applier threw err=" << err << dendl;
-      return err;
-    }
-  } catch (const int err) {
-    ldout(s->cct, 5) << "local auth engine threw err=" << err << dendl;
-    return err;
+  /* neither keystone and rados enabled; warn and exit! */
+  if (!store->ctx()->_conf->rgw_s3_auth_use_rados &&
+      !store->ctx()->_conf->rgw_s3_auth_use_keystone &&
+      !store->ctx()->_conf->rgw_s3_auth_use_ldap) {
+    dout(0) << "WARNING: no authorization backend enabled! Users will never authenticate." << dendl;
+    return -EPERM;
   }
 
-  return -ERR_SIGNATURE_NO_MATCH;
+  const auto ret = rgw::auth::Strategy::apply(auth_registry.get_s3_main(), s);
+  if (ret == 0) {
+    /* Populate the owner info. */
+    s->owner.set_id(s->user->user_id);
+    s->owner.set_name(s->user->display_name);
+  }
+  return ret;
 }
 
 int RGWHandler_Auth_S3::init(RGWRados *store, struct req_state *state,
@@ -3830,6 +3416,31 @@ RGWHandler_REST* RGWRESTMgr_S3::get_handler(struct req_state* const s,
   return handler;
 }
 
+bool RGWHandler_REST_S3Website::web_dir() const {
+  std::string subdir_name = url_decode(s->object.name);
+
+  if (subdir_name.empty()) {
+    return false;
+  } else if (subdir_name.back() == '/') {
+    subdir_name.pop_back();
+  }
+
+  rgw_obj obj(s->bucket, subdir_name);
+
+  RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+  obj_ctx.obj.set_atomic(obj);
+  obj_ctx.obj.set_prefetch_data(obj);
+
+  RGWObjState* state = nullptr;
+  if (store->get_obj_state(&obj_ctx, s->bucket_info, obj, &state, false) < 0) {
+    return false;
+  }
+  if (! state->exists) {
+    return false;
+  }
+  return state->exists;
+}
+
 int RGWHandler_REST_S3Website::retarget(RGWOp* op, RGWOp** new_op) {
   *new_op = op;
   ldout(s->cct, 10) << __func__ << "Starting retarget" << dendl;
@@ -3851,7 +3462,7 @@ int RGWHandler_REST_S3Website::retarget(RGWOp* op, RGWOp** new_op) {
   }
 
   rgw_obj_key new_obj;
-  s->bucket_info.website_conf.get_effective_key(s->object.name, &new_obj.name);
+  s->bucket_info.website_conf.get_effective_key(s->object.name, &new_obj.name, web_dir());
   ldout(s->cct, 10) << "retarget get_effective_key " << s->object << " -> "
                    << new_obj << dendl;
 
@@ -3969,11 +3580,11 @@ int RGWHandler_REST_S3Website::serve_errordoc(int http_ret, const string& errord
 int RGWHandler_REST_S3Website::error_handler(int err_no,
                                            string* error_content) {
   int new_err_no = -1;
-  const struct rgw_http_errors* r;
+  rgw_http_errors::const_iterator r = rgw_http_s3_errors.find(err_no > 0 ? err_no : -err_no);
   int http_error_code = -1;
-  r = search_err(err_no > 0 ? err_no : -err_no, RGW_HTTP_ERRORS, ARRAY_LEN(RGW_HTTP_ERRORS));
-  if (r) {
-    http_error_code = r->http_ret;
+
+  if (r != rgw_http_s3_errors.end()) {
+    http_error_code = r->second.first;
   }
   ldout(s->cct, 10) << "RGWHandler_REST_S3Website::error_handler err_no=" << err_no << " http_ret=" << http_error_code << dendl;
 
@@ -4053,8 +3664,8 @@ namespace rgw {
 namespace auth {
 namespace s3 {
 
-bool rgw::auth::s3::RGWS3V2Extractor::is_time_skew_ok(const utime_t& header_time,
-                                                      const bool qsr) const
+bool AWSGeneralAbstractor::is_time_skew_ok(const utime_t& header_time,
+                                           const bool qsr) const
 {
   /* Check for time skew first. */
   const time_t req_sec = header_time.sec();
@@ -4080,25 +3691,246 @@ bool rgw::auth::s3::RGWS3V2Extractor::is_time_skew_ok(const utime_t& header_time
   }
 }
 
-std::tuple<Version2ndEngine::Extractor::access_key_id_t,
-           Version2ndEngine::Extractor::signature_t,
-           Version2ndEngine::Extractor::string_to_sign_t>
-rgw::auth::s3::RGWS3V2Extractor::get_auth_data(const req_state* const s) const
+
+static rgw::auth::Completer::cmplptr_t
+null_completer_factory(const boost::optional<std::string>& secret_key)
+{
+  return nullptr;
+}
+
+
+AWSEngine::VersionAbstractor::auth_data_t
+AWSGeneralAbstractor::get_auth_data(const req_state* const s) const
+{
+  AwsVersion version;
+  AwsRoute route;
+  std::tie(version, route) = discover_aws_flavour(s->info);
+
+  if (version == AwsVersion::V2) {
+    return get_auth_data_v2(s);
+  } else if (version == AwsVersion::V4) {
+    return get_auth_data_v4(s, route == AwsRoute::QUERY_STRING);
+  } else {
+    /* FIXME(rzarzynski): handle anon user. */
+    throw -EINVAL;
+  }
+}
+
+boost::optional<std::string>
+AWSGeneralAbstractor::get_v4_canonical_headers(
+  const req_info& info,
+  const boost::string_view& signedheaders,
+  const bool using_qs) const
+{
+  return rgw::auth::s3::get_v4_canonical_headers(info, signedheaders,
+                                                 using_qs, false);
+}
+
+AWSEngine::VersionAbstractor::auth_data_t
+AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
+                                       /* FIXME: const. */
+                                       bool using_qs) const
+{
+  boost::string_view access_key_id;
+  boost::string_view signed_hdrs;
+
+  boost::string_view date;
+  boost::string_view credential_scope;
+  boost::string_view client_signature;
+
+  int ret = rgw::auth::s3::parse_credentials(s->info,
+                                             access_key_id,
+                                             credential_scope,
+                                             signed_hdrs,
+                                             client_signature,
+                                             date,
+                                             using_qs);
+  if (ret < 0) {
+    throw ret;
+  }
+
+  /* craft canonical headers */
+  boost::optional<std::string> canonical_headers = \
+    get_v4_canonical_headers(s->info, signed_hdrs, using_qs);
+  if (canonical_headers) {
+    ldout(s->cct, 10) << "canonical headers format = " << *canonical_headers
+                      << dendl;
+  } else {
+    throw -EPERM;
+  }
+
+  /* Get the expected hash. */
+  auto exp_payload_hash = rgw::auth::s3::get_v4_exp_payload_hash(s->info);
+
+  /* Craft canonical URI. Using std::move later so let it be non-const. */
+  auto canonical_uri = rgw::auth::s3::get_v4_canonical_uri(s->info);
+
+  /* Craft canonical query string. std::moving later so non-const here. */
+  auto canonical_qs = rgw::auth::s3::get_v4_canonical_qs(s->info, using_qs);
+
+  /* Craft canonical request. */
+  auto canonical_req_hash = \
+    rgw::auth::s3::get_v4_canon_req_hash(s->cct,
+                                         s->info.method,
+                                         std::move(canonical_uri),
+                                         std::move(canonical_qs),
+                                         std::move(*canonical_headers),
+                                         signed_hdrs,
+                                         exp_payload_hash);
+
+  auto string_to_sign = \
+    rgw::auth::s3::get_v4_string_to_sign(s->cct,
+                                         AWS4_HMAC_SHA256_STR,
+                                         date,
+                                         credential_scope,
+                                         std::move(canonical_req_hash));
+
+  const auto sig_factory = std::bind(rgw::auth::s3::get_v4_signature,
+                                     credential_scope,
+                                     std::placeholders::_1,
+                                     std::placeholders::_2,
+                                     std::placeholders::_3);
+
+  /* Requests authenticated with the Query Parameters are treated as unsigned.
+   * From "Authenticating Requests: Using Query Parameters (AWS Signature
+   * Version 4)":
+   *
+   *   You don't include a payload hash in the Canonical Request, because
+   *   when you create a presigned URL, you don't know the payload content
+   *   because the URL is used to upload an arbitrary payload. Instead, you
+   *   use a constant string UNSIGNED-PAYLOAD.
+   *
+   * This means we have absolutely no business in spawning completer. Both
+   * aws4_auth_needs_complete and aws4_auth_streaming_mode are set to false
+   * by default. We don't need to change that. */
+  if (is_v4_payload_unsigned(exp_payload_hash) || is_v4_payload_empty(s)) {
+    return {
+      access_key_id,
+      client_signature,
+      std::move(string_to_sign),
+      sig_factory,
+      null_completer_factory
+    };
+  } else {
+    /* We're going to handle a signed payload. Be aware that even empty HTTP
+     * body (no payload) requires verification:
+     *
+     *   The x-amz-content-sha256 header is required for all AWS Signature
+     *   Version 4 requests. It provides a hash of the request payload. If
+     *   there is no payload, you must provide the hash of an empty string. */
+    if (!is_v4_payload_streamed(exp_payload_hash)) {
+      ldout(s->cct, 10) << "delaying v4 auth" << dendl;
+
+      /* payload in a single chunk */
+      switch (s->op_type)
+      {
+        case RGW_OP_CREATE_BUCKET:
+        case RGW_OP_PUT_OBJ:
+        case RGW_OP_PUT_ACLS:
+        case RGW_OP_PUT_CORS:
+        case RGW_OP_INIT_MULTIPART: // in case that Init Multipart uses CHUNK encoding
+        case RGW_OP_COMPLETE_MULTIPART:
+        case RGW_OP_SET_BUCKET_VERSIONING:
+        case RGW_OP_DELETE_MULTI_OBJ:
+        case RGW_OP_ADMIN_SET_METADATA:
+        case RGW_OP_SET_BUCKET_WEBSITE:
+        case RGW_OP_PUT_BUCKET_POLICY:
+        case RGW_OP_PUT_OBJ_TAGGING:
+        case RGW_OP_PUT_LC:
+          break;
+        default:
+          dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
+          throw -ERR_NOT_IMPLEMENTED;
+      }
+
+      const auto cmpl_factory = std::bind(AWSv4ComplSingle::create,
+                                          s,
+                                          std::placeholders::_1);
+      return {
+        access_key_id,
+        client_signature,
+        std::move(string_to_sign),
+        sig_factory,
+        cmpl_factory
+      };
+    } else {
+      /* IMHO "streamed" doesn't fit too good here. I would prefer to call
+       * it "chunked" but let's be coherent with Amazon's terminology. */
+
+      dout(10) << "body content detected in multiple chunks" << dendl;
+
+      /* payload in multiple chunks */
+
+      switch(s->op_type)
+      {
+        case RGW_OP_PUT_OBJ:
+          break;
+        default:
+          dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED (streaming mode)" << dendl;
+          throw -ERR_NOT_IMPLEMENTED;
+      }
+
+      dout(10) << "aws4 seed signature ok... delaying v4 auth" << dendl;
+
+      /* In the case of streamed payload client sets the x-amz-content-sha256
+       * to "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" but uses "UNSIGNED-PAYLOAD"
+       * when constructing the Canonical Request. */
+
+      /* In the case of single-chunk upload client set the header's value is
+       * coherent with the one used for Canonical Request crafting. */
+
+      /* In the case of query string-based authentication there should be no
+       * x-amz-content-sha256 header and the value "UNSIGNED-PAYLOAD" is used
+       * for CanonReq. */
+      const auto cmpl_factory = std::bind(AWSv4ComplMulti::create,
+                                          s,
+                                          date,
+                                          credential_scope,
+                                          client_signature,
+                                          std::placeholders::_1);
+      return {
+        access_key_id,
+        client_signature,
+        std::move(string_to_sign),
+        sig_factory,
+        cmpl_factory
+      };
+    }
+  }
+}
+
+
+boost::optional<std::string>
+AWSGeneralBoto2Abstractor::get_v4_canonical_headers(
+  const req_info& info,
+  const boost::string_view& signedheaders,
+  const bool using_qs) const
+{
+  return rgw::auth::s3::get_v4_canonical_headers(info, signedheaders,
+                                                 using_qs, true);
+}
+
+
+AWSEngine::VersionAbstractor::auth_data_t
+AWSGeneralAbstractor::get_auth_data_v2(const req_state* const s) const
 {
-  std::string access_key_id;
-  std::string signature;
+  boost::string_view access_key_id;
+  boost::string_view signature;
   bool qsr = false;
 
-  if (! s->http_auth || s->http_auth[0] == '\0') {
+  const char* http_auth = s->info.env->get("HTTP_AUTHORIZATION");
+  if (! http_auth || http_auth[0] == '\0') {
     /* Credentials are provided in query string. We also need to verify
      * the "Expires" parameter now. */
     access_key_id = s->info.args.get("AWSAccessKeyId");
     signature = s->info.args.get("Signature");
     qsr = true;
 
-    std::string expires = s->info.args.get("Expires");
+    boost::string_view expires = s->info.args.get("Expires");
     if (! expires.empty()) {
-      const time_t exp = atoll(expires.c_str());
+      /* It looks we have the guarantee that expires is a null-terminated,
+       * and thus string_view::data() can be safely used. */
+      const time_t exp = atoll(expires.data());
       time_t now;
       time(&now);
 
@@ -4108,9 +3940,9 @@ rgw::auth::s3::RGWS3V2Extractor::get_auth_data(const req_state* const s) const
     }
   } else {
     /* The "Authorization" HTTP header is being used. */
-    const std::string auth_str(s->http_auth + strlen("AWS "));
+    const boost::string_view auth_str(http_auth + strlen("AWS "));
     const size_t pos = auth_str.rfind(':');
-    if (pos != std::string::npos) {
+    if (pos != boost::string_view::npos) {
       access_key_id = auth_str.substr(0, pos);
       signature = auth_str.substr(pos + 1);
     }
@@ -4133,9 +3965,87 @@ rgw::auth::s3::RGWS3V2Extractor::get_auth_data(const req_state* const s) const
     throw -ERR_REQUEST_TIME_SKEWED;
   }
 
-  return std::make_tuple(std::move(access_key_id),
-                         std::move(signature),
-                         std::move(string_to_sign));
+  return {
+    std::move(access_key_id),
+    std::move(signature),
+    std::move(string_to_sign),
+    rgw::auth::s3::get_v2_signature,
+    null_completer_factory
+  };
+}
+
+
+AWSEngine::VersionAbstractor::auth_data_t
+AWSBrowserUploadAbstractor::get_auth_data_v2(const req_state* const s) const
+{
+  return {
+    s->auth.s3_postobj_creds.access_key,
+    s->auth.s3_postobj_creds.signature,
+    s->auth.s3_postobj_creds.encoded_policy.to_str(),
+    rgw::auth::s3::get_v2_signature,
+    null_completer_factory
+  };
+}
+
+AWSEngine::VersionAbstractor::auth_data_t
+AWSBrowserUploadAbstractor::get_auth_data_v4(const req_state* const s) const
+{
+  const boost::string_view credential = s->auth.s3_postobj_creds.x_amz_credential;
+
+  /* grab access key id */
+  const size_t pos = credential.find("/");
+  const boost::string_view access_key_id = credential.substr(0, pos);
+  dout(10) << "access key id = " << access_key_id << dendl;
+
+  /* grab credential scope */
+  const boost::string_view credential_scope = credential.substr(pos + 1);
+  dout(10) << "credential scope = " << credential_scope << dendl;
+
+  const auto sig_factory = std::bind(rgw::auth::s3::get_v4_signature,
+                                     credential_scope,
+                                     std::placeholders::_1,
+                                     std::placeholders::_2,
+                                     std::placeholders::_3);
+
+  return {
+    access_key_id,
+    s->auth.s3_postobj_creds.signature,
+    s->auth.s3_postobj_creds.encoded_policy.to_str(),
+    sig_factory,
+    null_completer_factory
+  };
+}
+
+AWSEngine::VersionAbstractor::auth_data_t
+AWSBrowserUploadAbstractor::get_auth_data(const req_state* const s) const
+{
+  if (s->auth.s3_postobj_creds.x_amz_algorithm == AWS4_HMAC_SHA256_STR) {
+    ldout(s->cct, 0) << "Signature verification algorithm AWS v4"
+                     << " (AWS4-HMAC-SHA256)" << dendl;
+    return get_auth_data_v4(s);
+  } else {
+    ldout(s->cct, 0) << "Signature verification algorithm AWS v2" << dendl;
+    return get_auth_data_v2(s);
+  }
+}
+
+
+AWSEngine::result_t
+AWSEngine::authenticate(const req_state* const s) const
+{
+  /* Small reminder: an ver_abstractor is allowed to throw! */
+  const auto auth_data = ver_abstractor.get_auth_data(s);
+
+  if (auth_data.access_key_id.empty() || auth_data.client_signature.empty()) {
+    return result_t::deny(-EINVAL);
+  } else {
+    return authenticate(auth_data.access_key_id,
+                       auth_data.client_signature,
+                       auth_data.string_to_sign,
+                        auth_data.signature_factory,
+                       auth_data.completer_factory,
+                       s);
+  }
 }
 
 } /* namespace s3 */
@@ -4192,10 +4102,13 @@ rgw::auth::s3::LDAPEngine::get_creds_info(const rgw::RGWToken& token) const noex
 }
 
 rgw::auth::Engine::result_t
-rgw::auth::s3::LDAPEngine::authenticate(const std::string& access_key_id,
-                                        const std::string& signature,
-                                        const std::string& string_to_sign,
-                                        const req_state* const s) const
+rgw::auth::s3::LDAPEngine::authenticate(
+  const boost::string_view& access_key_id,
+  const boost::string_view& signature,
+  const string_to_sign_t& string_to_sign,
+  const signature_factory_t&,
+  const completer_factory_t& completer_factory,
+  const req_state* const s) const
 {
   /* boost filters and/or string_ref may throw on invalid input */
   rgw::RGWToken base64_token;
@@ -4227,19 +4140,24 @@ rgw::auth::s3::LDAPEngine::authenticate(const std::string& access_key_id,
 
   auto apl = apl_factory->create_apl_remote(cct, s, get_acl_strategy(),
                                             get_creds_info(base64_token));
-  return result_t::grant(std::move(apl));
+  return result_t::grant(std::move(apl), completer_factory(boost::none));
 }
 
 
-/* LocalVersion2ndEngine */
+/* LocalEndgine */
 rgw::auth::Engine::result_t
-rgw::auth::s3::LocalVersion2ndEngine::authenticate(const std::string& access_key_id,
-                                                   const std::string& signature,
-                                                   const std::string& string_to_sign,
-                                                   const req_state* const s) const
+rgw::auth::s3::LocalEngine::authenticate(
+  const boost::string_view& _access_key_id,
+  const boost::string_view& signature,
+  const string_to_sign_t& string_to_sign,
+  const signature_factory_t& signature_factory,
+  const completer_factory_t& completer_factory,
+  const req_state* const s) const
 {
   /* get the user info */
   RGWUserInfo user_info;
+  /* TODO(rzarzynski): we need to have string-view taking variant. */
+  const std::string access_key_id = _access_key_id.to_string();
   if (rgw_get_user_info_by_access_key(store, access_key_id, user_info) < 0) {
       ldout(cct, 5) << "error reading user info, uid=" << access_key_id
               << " can't authenticate" << dendl;
@@ -4248,7 +4166,8 @@ rgw::auth::s3::LocalVersion2ndEngine::authenticate(const std::string& access_key
   //TODO: Uncomment, when we have a migration plan in place.
   /*else {
     if (s->user->type != TYPE_RGW) {
-      ldout(cct, 10) << "ERROR: User id of type: " << s->user->type << " is present" << dendl;
+      ldout(cct, 10) << "ERROR: User id of type: " << s->user->type
+                     << " is present" << dendl;
       throw -EPERM;
     }
   }*/
@@ -4260,21 +4179,34 @@ rgw::auth::s3::LocalVersion2ndEngine::authenticate(const std::string& access_key
   }
   const RGWAccessKey& k = iter->second;
 
-  std::string digest;
-  int ret = rgw_get_s3_header_digest(string_to_sign, k.key, digest);
-  if (ret < 0) {
-    return result_t::deny(-EPERM);
-  }
+  const VersionAbstractor::server_signature_t server_signature = \
+    signature_factory(cct, k.key, string_to_sign);
 
-  ldout(cct, 15) << "string_to_sign=" << rgw::crypt_sanitize::log_content{string_to_sign.c_str()} << dendl;
-  ldout(cct, 15) << "calculated digest=" << digest << dendl;
-  ldout(cct, 15) << "auth signature=" << signature << dendl;
-  ldout(cct, 15) << "compare=" << signature.compare(digest) << dendl;
+  ldout(cct, 15) << "string_to_sign="
+                 << rgw::crypt_sanitize::log_content{string_to_sign}
+                 << dendl;
+  ldout(cct, 15) << "server signature=" << server_signature << dendl;
+  ldout(cct, 15) << "client signature=" << signature << dendl;
+  ldout(cct, 15) << "compare=" << signature.compare(server_signature) << dendl;
 
-  if (signature != digest) {
+  if (static_cast<boost::string_view>(server_signature) != signature) {
     return result_t::deny(-ERR_SIGNATURE_NO_MATCH);
   }
 
   auto apl = apl_factory->create_apl_local(cct, s, user_info, k.subuser);
-  return result_t::grant(std::move(apl));
+  return result_t::grant(std::move(apl), completer_factory(k.key));
+}
+
+bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
+  const req_state* s
+) const noexcept {
+  if (s->op == OP_OPTIONS) {
+    return true;
+  }
+
+  AwsVersion version;
+  AwsRoute route;
+  std::tie(version, route) = discover_aws_flavour(s->info);
+
+  return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKOWN;
 }