]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rest_s3.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_rest_s3.cc
index 9c7eabbaeeb1fa500eaaab77beb1b1c9366a71f2..1cb855d09e306d166925b06c7b7896aefffb045a 100644 (file)
@@ -1,23 +1,38 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <errno.h>
 #include <array>
 #include <string.h>
+#include <string_view>
 
 #include "common/ceph_crypto.h"
+#include "common/split.h"
 #include "common/Formatter.h"
 #include "common/utf8.h"
 #include "common/ceph_json.h"
 #include "common/safe_io.h"
-#include "common/backport14.h"
+#include "common/errno.h"
+#include "auth/Crypto.h"
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/replace.hpp>
-#include <boost/utility/string_view.hpp>
+#include <boost/tokenizer.hpp>
+#define BOOST_BIND_GLOBAL_PLACEHOLDERS
+#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
+#endif
+#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
+#pragma clang diagnostic pop
+#endif
+#undef BOOST_BIND_GLOBAL_PLACEHOLDERS
+
+#include <liboath/oath.h>
 
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
 #include "rgw_rest_s3website.h"
+#include "rgw_rest_pubsub.h"
 #include "rgw_auth_s3.h"
 #include "rgw_acl.h"
 #include "rgw_policy_s3.h"
 #include "rgw_rest_role.h"
 #include "rgw_crypt.h"
 #include "rgw_crypt_sanitize.h"
+#include "rgw_rest_user_policy.h"
+#include "rgw_zone.h"
+#include "rgw_bucket_sync.h"
 
-#include "include/assert.h"
+#include "services/svc_zone.h"
+#include "services/svc_cls.h"
+
+#include "include/ceph_assert.h"
+#include "rgw_role.h"
+#include "rgw_rest_sts.h"
+#include "rgw_rest_iam.h"
+#include "rgw_sts.h"
+#include "rgw_sal_rados.h"
+
+#include "rgw_s3select.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+using namespace std;
 using namespace rgw;
 using namespace ceph::crypto;
 
-using std::get;
-
 void list_all_buckets_start(struct req_state *s)
 {
   s->formatter->open_array_section_in_ns("ListAllMyBucketsResult", XMLNS_AWS_S3);
@@ -62,11 +89,11 @@ void list_all_buckets_end(struct req_state *s)
   s->formatter->close_section();
 }
 
-void dump_bucket(struct req_state *s, RGWBucketEnt& obj)
+void dump_bucket(struct req_state *s, rgw::sal::Bucket& obj)
 {
   s->formatter->open_object_section("Bucket");
-  s->formatter->dump_string("Name", obj.bucket.name);
-  dump_time(s, "CreationDate", &obj.creation_time);
+  s->formatter->dump_string("Name", obj.get_name());
+  dump_time(s, "CreationDate", obj.get_creation_time());
   s->formatter->close_section();
 }
 
@@ -83,6 +110,22 @@ void rgw_get_errno_s3(rgw_http_error *e , int err_no)
   }
 }
 
+static inline std::string get_s3_expiration_header(
+  struct req_state* s,
+  const ceph::real_time& mtime)
+{
+  return rgw::lc::s3_expiration_header(
+    s, s->object->get_key(), s->tagset, mtime, s->bucket_attrs);
+}
+
+static inline bool get_s3_multipart_abort_header(
+  struct req_state* s, const ceph::real_time& mtime,
+  ceph::real_time& date, std::string& rule_id)
+{
+  return rgw::lc::s3_multipart_abort_header(
+          s, s->object->get_key(), mtime, s->bucket_attrs, date, rule_id);
+}
+
 struct response_attr_param {
   const char *param;
   const char *http_attr;
@@ -103,9 +146,9 @@ int RGWGetObj_ObjStore_S3Website::send_response_data(bufferlist& bl, off_t bl_of
   iter = attrs.find(RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION);
   if (iter != attrs.end()) {
     bufferlist &bl = iter->second;
-    s->redirect = string(bl.c_str(), bl.length());
+    s->redirect = bl.c_str();
     s->err.http_ret = 301;
-    ldout(s->cct, 20) << __CEPH_ASSERT_FUNCTION << " redirecting per x-amz-website-redirect-location=" << s->redirect << dendl;
+    ldpp_dout(this, 20) << __CEPH_ASSERT_FUNCTION << " redirecting per x-amz-website-redirect-location=" << s->redirect << dendl;
     op_ret = -ERR_WEBSITE_REDIRECT;
     set_req_state_err(s, op_ret);
     dump_errno(s);
@@ -118,12 +161,12 @@ int RGWGetObj_ObjStore_S3Website::send_response_data(bufferlist& bl, off_t bl_of
   }
 }
 
-int RGWGetObj_ObjStore_S3Website::send_response_data_error()
+int RGWGetObj_ObjStore_S3Website::send_response_data_error(optional_yield y)
 {
-  return RGWGetObj_ObjStore_S3::send_response_data_error();
+  return RGWGetObj_ObjStore_S3::send_response_data_error(y);
 }
 
-int RGWGetObj_ObjStore_S3::get_params()
+int RGWGetObj_ObjStore_S3::get_params(optional_yield y)
 {
   // for multisite sync requests, only read the slo manifest itself, rather than
   // all of the data from its parts. the parts will sync as separate objects
@@ -135,10 +178,10 @@ int RGWGetObj_ObjStore_S3::get_params()
     skip_decrypt = s->info.args.exists(RGW_SYS_PARAM_PREFIX "skip-decrypt");
   }
 
-  return RGWGetObj_ObjStore::get_params();
+  return RGWGetObj_ObjStore::get_params(y);
 }
 
-int RGWGetObj_ObjStore_S3::send_response_data_error()
+int RGWGetObj_ObjStore_S3::send_response_data_error(optional_yield y)
 {
   bufferlist bl;
   return send_response_data(bl, 0 , 0);
@@ -157,15 +200,24 @@ int decode_attr_bl_single_value(map<string, bufferlist>& attrs, const char *attr
     *result = def_val;
     return 0;
   }
-  bufferlist::iterator bliter = bl.begin();
+  auto bliter = bl.cbegin();
   try {
-    ::decode(*result, bliter);
+    decode(*result, bliter);
   } catch (buffer::error& err) {
     return -EIO;
   }
   return 0;
 }
 
+inline bool str_has_cntrl(const std::string s) {
+  return std::any_of(s.begin(), s.end(), ::iscntrl);
+}
+
+inline bool str_has_cntrl(const char* s) {
+  std::string _s(s);
+  return str_has_cntrl(_s);
+}
+
 int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
                                              off_t bl_len)
 {
@@ -175,6 +227,8 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
   map<string, string>::iterator riter;
   bufferlist metadata_bl;
 
+  string expires = get_s3_expiration_header(s, lastmod);
+
   if (sent_header)
     goto send_data;
 
@@ -226,14 +280,14 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
     uint64_t pg_ver = 0;
     int r = decode_attr_bl_single_value(attrs, RGW_ATTR_PG_VER, &pg_ver, (uint64_t)0);
     if (r < 0) {
-      ldout(s->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
+      ldpp_dout(this, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
     }
     dump_header(s, "Rgwx-Obj-PG-Ver", pg_ver);
 
     uint32_t source_zone_short_id = 0;
     r = decode_attr_bl_single_value(attrs, RGW_ATTR_SOURCE_ZONE, &source_zone_short_id, (uint32_t)0);
     if (r < 0) {
-      ldout(s->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
+      ldpp_dout(this, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
     }
     if (source_zone_short_id != 0) {
       dump_header(s, "Rgwx-Source-Zone-Short-Id", source_zone_short_id);
@@ -245,10 +299,15 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
 
   dump_content_length(s, total_len);
   dump_last_modified(s, lastmod);
-  if (!version_id.empty()) {
-    dump_header(s, "x-amz-version-id", version_id);
+  dump_header_if_nonempty(s, "x-amz-version-id", version_id);
+  dump_header_if_nonempty(s, "x-amz-expiration", expires);
+
+  if (attrs.find(RGW_ATTR_APPEND_PART_NUM) != attrs.end()) {
+    dump_header(s, "x-rgw-object-type", "Appendable");
+    dump_header(s, "x-rgw-next-append-position", s->obj_size);
+  } else {
+    dump_header(s, "x-rgw-object-type", "Normal");
   }
-  
 
   if (! op_ret) {
     if (! lo_etag.empty()) {
@@ -260,7 +319,7 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
     } else {
       auto iter = attrs.find(RGW_ATTR_ETAG);
       if (iter != attrs.end()) {
-        dump_etag(s, iter->second);
+        dump_etag(s, iter->second.to_str());
       }
     }
 
@@ -268,6 +327,24 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
       bool exists;
       string val = s->info.args.get(p->param, &exists);
       if (exists) {
+       /* reject unauthenticated response header manipulation, see
+        * https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html */
+       if (s->auth.identity->is_anonymous()) {
+         return -ERR_INVALID_REQUEST;
+       }
+        /* HTTP specification says no control characters should be present in
+         * header values: https://tools.ietf.org/html/rfc7230#section-3.2
+         *      field-vchar    = VCHAR / obs-text
+         *
+         * Failure to validate this permits a CRLF injection in HTTP headers,
+         * whereas S3 GetObject only permits specific headers.
+         */
+        if(str_has_cntrl(val)) {
+          /* TODO: return a more distinct error in future;
+           * stating what the problem is */
+          return -ERR_INVALID_REQUEST;
+        }
+
        if (strcmp(p->param, "response-content-type") != 0) {
          response_attrs[p->http_attr] = val;
        } else {
@@ -283,15 +360,23 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
       if (aiter != rgw_to_http_attrs.end()) {
         if (response_attrs.count(aiter->second) == 0) {
           /* Was not already overridden by a response param. */
-          response_attrs[aiter->second] = iter->second.c_str();
+
+          size_t len = iter->second.length();
+          string s(iter->second.c_str(), len);
+          while (len && !s[len - 1]) {
+            --len;
+            s.resize(len);
+          }
+          response_attrs[aiter->second] = s;
         }
       } else if (iter->first.compare(RGW_ATTR_CONTENT_TYPE) == 0) {
         /* Special handling for content_type. */
         if (!content_type) {
-          content_type = iter->second.c_str();
+          content_type_str = rgw_bl_str(iter->second);
+          content_type = content_type_str.c_str();
         }
       } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) {
-        // this attr has an extra length prefix from ::encode() in prior versions
+        // this attr has an extra length prefix from encode() in prior versions
         dump_header(s, "X-Object-Meta-Static-Large-Object", "True");
       } else if (strncmp(name, RGW_ATTR_META_PREFIX,
                         sizeof(RGW_ATTR_META_PREFIX)-1) == 0) {
@@ -301,12 +386,30 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
       } else if (iter->first.compare(RGW_ATTR_TAGS) == 0) {
         RGWObjTags obj_tags;
         try{
-          bufferlist::iterator it = iter->second.begin();
+          auto it = iter->second.cbegin();
           obj_tags.decode(it);
         } catch (buffer::error &err) {
-          ldout(s->cct,0) << "Error caught buffer::error couldn't decode TagSet " << dendl;
+          ldpp_dout(this,0) << "Error caught buffer::error couldn't decode TagSet " << dendl;
         }
         dump_header(s, RGW_AMZ_TAG_COUNT, obj_tags.count());
+      } else if (iter->first.compare(RGW_ATTR_OBJECT_RETENTION) == 0 && get_retention){
+        RGWObjectRetention retention;
+        try {
+          decode(retention, iter->second);
+          dump_header(s, "x-amz-object-lock-mode", retention.get_mode());
+          string date = ceph::to_iso_8601(retention.get_retain_until_date());
+          dump_header(s, "x-amz-object-lock-retain-until-date", date.c_str());
+        } catch (buffer::error& err) {
+          ldpp_dout(this, 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
+        }
+      } else if (iter->first.compare(RGW_ATTR_OBJECT_LEGAL_HOLD) == 0 && get_legal_hold) {
+        RGWObjectLegalHold legal_hold;
+        try {
+          decode(legal_hold, iter->second);
+          dump_header(s, "x-amz-object-lock-legal-hold",legal_hold.get_status());
+        } catch (buffer::error& err) {
+          ldpp_dout(this, 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl;
+        }
       }
     }
   }
@@ -341,7 +444,7 @@ send_data:
   return 0;
 }
 
-int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetDataCB> *filter, RGWGetDataCB* cb, bufferlist* manifest_bl)
+int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter> *filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl)
 {
   if (skip_decrypt) { // bypass decryption for multisite sync requests
     return 0;
@@ -352,20 +455,57 @@ int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetDataCB> *fil
   res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses);
   if (res == 0) {
     if (block_crypt != nullptr) {
-      auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt)));
-      //RGWGetObj_BlockDecrypt* f = new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt));
-      if (f != nullptr) {
-        if (manifest_bl != nullptr) {
-          res = f->read_manifest(*manifest_bl);
-          if (res == 0) {
-            *filter = std::move(f);
-          }
+      auto f = std::make_unique<RGWGetObj_BlockDecrypt>(s, s->cct, cb, std::move(block_crypt));
+      if (manifest_bl != nullptr) {
+        res = f->read_manifest(this, *manifest_bl);
+        if (res == 0) {
+          *filter = std::move(f);
         }
       }
     }
   }
   return res;
 }
+int RGWGetObj_ObjStore_S3::verify_requester(const rgw::auth::StrategyRegistry& auth_registry, optional_yield y) 
+{
+  int ret = -EINVAL;
+  ret = RGWOp::verify_requester(auth_registry, y);
+  if(!s->user->get_caps().check_cap("amz-cache", RGW_CAP_READ) && !ret && s->info.env->exists("HTTP_X_AMZ_CACHE"))
+    ret = override_range_hdr(auth_registry, y);
+  return ret;
+}
+
+int RGWGetObj_ObjStore_S3::override_range_hdr(const rgw::auth::StrategyRegistry& auth_registry, optional_yield y)
+{
+  int ret = -EINVAL;
+  ldpp_dout(this, 10) << "cache override headers" << dendl;
+  RGWEnv* rgw_env = const_cast<RGWEnv *>(s->info.env);
+  const char* backup_range = rgw_env->get("HTTP_RANGE");
+  const char hdrs_split[2] = {(char)178,'\0'};
+  const char kv_split[2] = {(char)177,'\0'};
+  const char* cache_hdr = rgw_env->get("HTTP_X_AMZ_CACHE");
+  for (std::string_view hdr : ceph::split(cache_hdr, hdrs_split)) {
+    auto kv = ceph::split(hdr, kv_split);
+    auto k = kv.begin();
+    if (std::distance(k, kv.end()) != 2) {
+      return -EINVAL;
+    }
+    auto v = std::next(k);
+    std::string key = "HTTP_";
+    key.append(*k);
+    boost::replace_all(key, "-", "_");
+    rgw_env->set(std::move(key), std::string(*v));
+    ldpp_dout(this, 10) << "after splitting cache kv key: " << key  << " " << rgw_env->get(key.c_str())  << dendl;
+  }
+  ret = RGWOp::verify_requester(auth_registry, y);
+  if(!ret && backup_range) {
+    rgw_env->set("HTTP_RANGE",backup_range);
+  } else {
+    rgw_env->remove("HTTP_RANGE");
+  }
+  return ret;
+}
+
 
 void RGWGetObjTags_ObjStore_S3::send_response_data(bufferlist& bl)
 {
@@ -377,11 +517,11 @@ void RGWGetObjTags_ObjStore_S3::send_response_data(bufferlist& bl)
   s->formatter->open_object_section("TagSet");
   if (has_tags){
     RGWObjTagSet_S3 tagset;
-    bufferlist::iterator iter = bl.begin();
+    auto iter = bl.cbegin();
     try {
       tagset.decode(iter);
     } catch (buffer::error& err) {
-      ldout(s->cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+      ldpp_dout(this,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
       op_ret= -EIO;
       return;
     }
@@ -393,45 +533,43 @@ void RGWGetObjTags_ObjStore_S3::send_response_data(bufferlist& bl)
 }
 
 
-int RGWPutObjTags_ObjStore_S3::get_params()
+int RGWPutObjTags_ObjStore_S3::get_params(optional_yield y)
 {
-  RGWObjTagsXMLParser parser;
+  RGWXMLParser parser;
 
   if (!parser.init()){
     return -EINVAL;
   }
 
-  char *data=nullptr;
-  int len=0;
-
   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-  int r = rgw_rest_read_all_input(s, &data, &len, max_size, false);
+
+  int r = 0;
+  bufferlist data;
+  std::tie(r, data) = read_all_input(s, max_size, false);
 
   if (r < 0)
     return r;
 
-  auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
-
-  if (!parser.parse(data, len, 1)) {
+  if (!parser.parse(data.c_str(), data.length(), 1)) {
     return -ERR_MALFORMED_XML;
   }
 
-  RGWObjTagSet_S3 *obj_tags_s3;
-  RGWObjTagging_S3 *tagging;
+  RGWObjTagging_S3 tagging;
 
-  tagging = static_cast<RGWObjTagging_S3 *>(parser.find_first("Tagging"));
-  obj_tags_s3 = static_cast<RGWObjTagSet_S3 *>(tagging->find_first("TagSet"));
-  if(!obj_tags_s3){
+  try {
+    RGWXMLDecoder::decode_xml("Tagging", tagging, &parser);
+  } catch (RGWXMLDecoder::err& err) {
+    ldpp_dout(this, 5) << "Malformed tagging request: " << err << dendl;
     return -ERR_MALFORMED_XML;
   }
 
   RGWObjTags obj_tags;
-  r = obj_tags_s3->rebuild(obj_tags);
+  r = tagging.rebuild(obj_tags);
   if (r < 0)
     return r;
 
   obj_tags.encode(tags_bl);
-  ldout(s->cct, 20) << "Read " << obj_tags.count() << "tags" << dendl;
+  ldpp_dout(this, 20) << "Read " << obj_tags.count() << "tags" << dendl;
 
   return 0;
 }
@@ -459,33 +597,699 @@ void RGWDeleteObjTags_ObjStore_S3::send_response()
   end_header(s, this);
 }
 
+void RGWGetBucketTags_ObjStore_S3::send_response_data(bufferlist& bl)
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  if (!op_ret) {
+  s->formatter->open_object_section_in_ns("Tagging", XMLNS_AWS_S3);
+  s->formatter->open_object_section("TagSet");
+  if (has_tags){
+    RGWObjTagSet_S3 tagset;
+    auto iter = bl.cbegin();
+    try {
+      tagset.decode(iter);
+    } catch (buffer::error& err) {
+      ldpp_dout(this,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+      op_ret= -EIO;
+      return;
+    }
+    tagset.dump_xml(s->formatter);
+  }
+  s->formatter->close_section();
+  s->formatter->close_section();
+  rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+}
+
+int RGWPutBucketTags_ObjStore_S3::get_params(const DoutPrefixProvider *dpp, optional_yield y)
+{
+  RGWXMLParser parser;
+
+  if (!parser.init()){
+    return -EINVAL;
+  }
+
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  int r = 0;
+  bufferlist data;
+
+  std::tie(r, data) = read_all_input(s, max_size, false);
+
+  if (r < 0)
+    return r;
+
+  if (!parser.parse(data.c_str(), data.length(), 1)) {
+    return -ERR_MALFORMED_XML;
+  }
+
+  RGWObjTagging_S3 tagging;
+  try {
+    RGWXMLDecoder::decode_xml("Tagging", tagging, &parser);
+  } catch (RGWXMLDecoder::err& err) {
+
+    ldpp_dout(dpp, 5) << "Malformed tagging request: " << err << dendl;
+    return -ERR_MALFORMED_XML;
+  }
+
+  RGWObjTags obj_tags(50); // A tag set can contain as many as 50 tags, or it can be empty.
+  r = tagging.rebuild(obj_tags);
+  if (r < 0)
+    return r;
+
+  obj_tags.encode(tags_bl);
+  ldpp_dout(dpp, 20) << "Read " << obj_tags.count() << "tags" << dendl;
+
+  // forward bucket tags requests to meta master zone
+  if (!store->is_meta_master()) {
+    /* only need to keep this data around if we're not meta master */
+    in_data = std::move(data);
+  }
+
+  return 0;
+}
+
+void RGWPutBucketTags_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
+void RGWDeleteBucketTags_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
+namespace {
+
+bool is_valid_status(const string& s) {
+  return (s == "Enabled" ||
+          s == "Disabled");
+}
+
+static string enabled_group_id = "s3-bucket-replication:enabled";
+static string disabled_group_id = "s3-bucket-replication:disabled";
+
+struct ReplicationConfiguration {
+  string role;
+
+  struct Rule {
+    struct DeleteMarkerReplication {
+      string status;
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("Status", status, obj);
+      }
+
+      void dump_xml(Formatter *f) const {
+        encode_xml("Status", status, f);
+      }
+
+      bool is_valid(CephContext *cct) const {
+        bool result = is_valid_status(status);
+        if (!result) {
+          ldout(cct, 5) << "NOTICE: bad status provided in DeleteMarkerReplication element (status=" << status << ")" << dendl;
+        }
+        return result;
+      }
+    };
+
+    struct Source { /* rgw extension */
+      std::vector<string> zone_names;
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("Zone", zone_names, obj);
+      }
+
+      void dump_xml(Formatter *f) const {
+        encode_xml("Zone", zone_names, f);
+      }
+    };
+
+    struct Destination {
+      struct AccessControlTranslation {
+        string owner;
+
+        void decode_xml(XMLObj *obj) {
+          RGWXMLDecoder::decode_xml("Owner", owner, obj);
+        }
+        void dump_xml(Formatter *f) const {
+          encode_xml("Owner", owner, f);
+        }
+      };
+
+      std::optional<AccessControlTranslation> acl_translation;
+      std::optional<string> account;
+      string bucket;
+      std::optional<string> storage_class;
+      std::vector<string> zone_names;
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("AccessControlTranslation", acl_translation, obj);
+        RGWXMLDecoder::decode_xml("Account", account, obj);
+        if (account && account->empty()) {
+          account.reset();
+        }
+        RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+        RGWXMLDecoder::decode_xml("StorageClass", storage_class, obj);
+        if (storage_class && storage_class->empty()) {
+          storage_class.reset();
+        }
+        RGWXMLDecoder::decode_xml("Zone", zone_names, obj); /* rgw extension */
+      }
+
+      void dump_xml(Formatter *f) const {
+        encode_xml("AccessControlTranslation", acl_translation, f);
+        encode_xml("Account", account, f);
+        encode_xml("Bucket", bucket, f);
+        encode_xml("StorageClass", storage_class, f);
+        encode_xml("Zone", zone_names, f);
+      }
+    };
+
+    struct Filter {
+      struct Tag {
+        string key;
+        string value;
+
+        bool empty() const {
+          return key.empty() && value.empty();
+        }
+
+        void decode_xml(XMLObj *obj) {
+          RGWXMLDecoder::decode_xml("Key", key, obj);
+          RGWXMLDecoder::decode_xml("Value", value, obj);
+        };
+
+        void dump_xml(Formatter *f) const {
+          encode_xml("Key", key, f);
+          encode_xml("Value", value, f);
+        }
+      };
+
+      struct AndElements {
+        std::optional<string> prefix;
+        std::vector<Tag> tags;
+
+        bool empty() const {
+          return !prefix &&
+            (tags.size() == 0);
+        }
+
+        void decode_xml(XMLObj *obj) {
+          std::vector<Tag> _tags;
+          RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+          if (prefix && prefix->empty()) {
+            prefix.reset();
+          }
+          RGWXMLDecoder::decode_xml("Tag", _tags, obj);
+          for (auto& t : _tags) {
+            if (!t.empty()) {
+              tags.push_back(std::move(t));
+            }
+          }
+        };
+
+        void dump_xml(Formatter *f) const {
+          encode_xml("Prefix", prefix, f);
+          encode_xml("Tag", tags, f);
+        }
+      };
+
+      std::optional<string> prefix;
+      std::optional<Tag> tag;
+      std::optional<AndElements> and_elements;
+
+      bool empty() const {
+        return (!prefix && !tag && !and_elements);
+      }
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+        if (prefix && prefix->empty()) {
+          prefix.reset();
+        }
+        RGWXMLDecoder::decode_xml("Tag", tag, obj);
+        if (tag && tag->empty()) {
+          tag.reset();
+        }
+        RGWXMLDecoder::decode_xml("And", and_elements, obj);
+        if (and_elements && and_elements->empty()) {
+          and_elements.reset();
+        }
+      };
+
+      void dump_xml(Formatter *f) const {
+        encode_xml("Prefix", prefix, f);
+        encode_xml("Tag", tag, f);
+        encode_xml("And", and_elements, f);
+      }
+
+      bool is_valid(CephContext *cct) const {
+        if (tag && prefix) {
+          ldout(cct, 5) << "NOTICE: both tag and prefix were provided in replication filter rule" << dendl;
+          return false;
+        }
+
+        if (and_elements) {
+          if (prefix && and_elements->prefix) {
+            ldout(cct, 5) << "NOTICE: too many prefixes were provided in re" << dendl;
+            return false;
+          }
+        }
+        return true;
+      };
+
+      int to_sync_pipe_filter(CephContext *cct,
+                              rgw_sync_pipe_filter *f) const {
+        if (!is_valid(cct)) {
+          return -EINVAL;
+        }
+        if (prefix) {
+          f->prefix = *prefix;
+        }
+        if (tag) {
+          f->tags.insert(rgw_sync_pipe_filter_tag(tag->key, tag->value));
+        }
+
+        if (and_elements) {
+          if (and_elements->prefix) {
+            f->prefix = *and_elements->prefix;
+          }
+          for (auto& t : and_elements->tags) {
+            f->tags.insert(rgw_sync_pipe_filter_tag(t.key, t.value));
+          }
+        }
+        return 0;
+      }
+
+      void from_sync_pipe_filter(const rgw_sync_pipe_filter& f) {
+        if (f.prefix && f.tags.empty()) {
+          prefix = f.prefix;
+          return;
+        }
+        if (f.prefix) {
+          and_elements.emplace();
+          and_elements->prefix = f.prefix;
+        } else if (f.tags.size() == 1) {
+          auto iter = f.tags.begin();
+          if (iter == f.tags.end()) {
+            /* should never happen */
+            return;
+          }
+          auto& t = *iter;
+          tag.emplace();
+          tag->key = t.key;
+          tag->value = t.value;
+          return;
+        }
+
+        if (f.tags.empty()) {
+          return;
+        }
+
+        if (!and_elements) {
+          and_elements.emplace();
+        }
+
+        for (auto& t : f.tags) {
+          auto& tag = and_elements->tags.emplace_back();
+          tag.key = t.key;
+          tag.value = t.value;
+        }
+      }
+    };
+
+    set<rgw_zone_id> get_zone_ids_from_names(rgw::sal::Store* store,
+                                             const vector<string>& zone_names) const {
+      set<rgw_zone_id> ids;
+
+      for (auto& name : zone_names) {
+        rgw_zone_id id;
+        if (static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->find_zone_id_by_name(name, &id)) {
+          ids.insert(std::move(id));
+        }
+      }
+
+      return ids;
+    }
+
+    vector<string> get_zone_names_from_ids(rgw::sal::Store* store,
+                                           const set<rgw_zone_id>& zone_ids) const {
+      vector<string> names;
+
+      for (auto& id : zone_ids) {
+        RGWZone *zone;
+        if (static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->find_zone(id, &zone)) {
+          names.emplace_back(zone->name);
+        }
+      }
+
+      return names;
+    }
+
+    std::optional<DeleteMarkerReplication> delete_marker_replication;
+    std::optional<Source> source;
+    Destination destination;
+    std::optional<Filter> filter;
+    string id;
+    int32_t priority;
+    string status;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("DeleteMarkerReplication", delete_marker_replication, obj);
+      RGWXMLDecoder::decode_xml("Source", source, obj);
+      RGWXMLDecoder::decode_xml("Destination", destination, obj);
+      RGWXMLDecoder::decode_xml("ID", id, obj);
+
+      std::optional<string> prefix;
+      RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+      if (prefix) {
+        filter.emplace();
+        filter->prefix = prefix;
+      }
+
+      if (!filter) {
+        RGWXMLDecoder::decode_xml("Filter", filter, obj);
+      } else {
+        /* don't want to have filter reset because it might have been initialized
+         * when decoding prefix
+         */
+        RGWXMLDecoder::decode_xml("Filter", *filter, obj);
+      }
+
+      RGWXMLDecoder::decode_xml("Priority", priority, obj);
+      RGWXMLDecoder::decode_xml("Status", status, obj);
+    }
+
+    void dump_xml(Formatter *f) const {
+      encode_xml("DeleteMarkerReplication", delete_marker_replication, f);
+      encode_xml("Source", source, f);
+      encode_xml("Destination", destination, f);
+      encode_xml("Filter", filter, f);
+      encode_xml("ID", id, f);
+      encode_xml("Priority", priority, f);
+      encode_xml("Status", status, f);
+    }
+
+    bool is_valid(CephContext *cct) const {
+      if (!is_valid_status(status)) {
+        ldout(cct, 5) << "NOTICE: bad status provided in rule (status=" << status << ")" << dendl;
+        return false;
+      }
+      if ((filter && !filter->is_valid(cct)) ||
+          (delete_marker_replication && !delete_marker_replication->is_valid(cct))) {
+        return false;
+      }
+      return true;
+    }
+
+    int to_sync_policy_pipe(req_state *s, rgw::sal::Store* store,
+                            rgw_sync_bucket_pipes *pipe,
+                            bool *enabled) const {
+      if (!is_valid(s->cct)) {
+        return -EINVAL;
+      }
+
+      pipe->id = id;
+      pipe->params.priority = priority;
+
+      const auto& user_id = s->user->get_id();
+
+      rgw_bucket_key dest_bk(user_id.tenant,
+                             destination.bucket);
+
+      if (source && !source->zone_names.empty()) {
+        pipe->source.zones = get_zone_ids_from_names(store, source->zone_names);
+      } else {
+        pipe->source.set_all_zones(true);
+      }
+      if (!destination.zone_names.empty()) {
+        pipe->dest.zones = get_zone_ids_from_names(store, destination.zone_names);
+      } else {
+        pipe->dest.set_all_zones(true);
+      }
+      pipe->dest.bucket.emplace(dest_bk);
+
+      if (filter) {
+        int r = filter->to_sync_pipe_filter(s->cct, &pipe->params.source.filter);
+        if (r < 0) {
+          return r;
+        }
+      }
+      if (destination.acl_translation) {
+        rgw_user u;
+        u.tenant = user_id.tenant;
+        u.from_str(destination.acl_translation->owner); /* explicit tenant will override tenant,
+                                                           otherwise will inherit it from s->user */
+        pipe->params.dest.acl_translation.emplace();
+        pipe->params.dest.acl_translation->owner = u;
+      }
+      pipe->params.dest.storage_class = destination.storage_class;
+
+      *enabled = (status == "Enabled");
+
+      pipe->params.mode = rgw_sync_pipe_params::Mode::MODE_USER;
+      pipe->params.user = user_id.to_str();
+
+      return 0;
+    }
+
+    void from_sync_policy_pipe(rgw::sal::Store* store,
+                              const rgw_sync_bucket_pipes& pipe,
+                              bool enabled) {
+      id = pipe.id;
+      status = (enabled ? "Enabled" : "Disabled");
+      priority = pipe.params.priority;
+
+      if (pipe.source.all_zones) {
+        source.reset();
+      } else if (pipe.source.zones) {
+        source.emplace();
+        source->zone_names = get_zone_names_from_ids(store, *pipe.source.zones);
+      }
+
+      if (!pipe.dest.all_zones &&
+          pipe.dest.zones) {
+        destination.zone_names = get_zone_names_from_ids(store, *pipe.dest.zones);
+      }
+
+      if (pipe.params.dest.acl_translation) {
+        destination.acl_translation.emplace();
+        destination.acl_translation->owner = pipe.params.dest.acl_translation->owner.to_str();
+      }
+
+      if (pipe.params.dest.storage_class) {
+        destination.storage_class = *pipe.params.dest.storage_class;
+      }
+
+      if (pipe.dest.bucket) {
+        destination.bucket = pipe.dest.bucket->get_key();
+      }
+
+      filter.emplace();
+      filter->from_sync_pipe_filter(pipe.params.source.filter);
+
+      if (filter->empty()) {
+        filter.reset();
+      }
+    }
+  };
+
+  std::vector<Rule> rules;
+
+  void decode_xml(XMLObj *obj) {
+    RGWXMLDecoder::decode_xml("Role", role, obj);
+    RGWXMLDecoder::decode_xml("Rule", rules, obj);
+  }
+
+  void dump_xml(Formatter *f) const {
+    encode_xml("Role", role, f);
+    encode_xml("Rule", rules, f);
+  }
+
+  int to_sync_policy_groups(req_state *s, rgw::sal::Store* store,
+                            vector<rgw_sync_policy_group> *result) const {
+    result->resize(2);
+
+    rgw_sync_policy_group& enabled_group = (*result)[0];
+    rgw_sync_policy_group& disabled_group = (*result)[1];
+
+    enabled_group.id = enabled_group_id;
+    enabled_group.status = rgw_sync_policy_group::Status::ENABLED;
+    disabled_group.id = disabled_group_id;
+    disabled_group.status = rgw_sync_policy_group::Status::ALLOWED; /* not enabled, not forbidden */
+
+    for (auto& rule : rules) {
+      rgw_sync_bucket_pipes pipe;
+      bool enabled;
+      int r = rule.to_sync_policy_pipe(s, store, &pipe, &enabled);
+      if (r < 0) {
+        ldpp_dout(s, 5) << "NOTICE: failed to convert replication configuration into sync policy pipe (rule.id=" << rule.id << "): " << cpp_strerror(-r) << dendl;
+        return r;
+      }
+
+      if (enabled) {
+        enabled_group.pipes.emplace_back(std::move(pipe));
+      } else {
+        disabled_group.pipes.emplace_back(std::move(pipe));
+      }
+    }
+    return 0;
+  }
+
+  void from_sync_policy_group(rgw::sal::Store* store,
+                              const rgw_sync_policy_group& group) {
+
+    bool enabled = (group.status == rgw_sync_policy_group::Status::ENABLED);
+
+    for (auto& pipe : group.pipes) {
+      auto& rule = rules.emplace_back();
+      rule.from_sync_policy_pipe(store, pipe, enabled);
+    }
+  }
+};
+
+}
+
+void RGWGetBucketReplication_ObjStore_S3::send_response_data()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  ReplicationConfiguration conf;
+
+  if (s->bucket->get_info().sync_policy) {
+    auto policy = s->bucket->get_info().sync_policy;
+
+    auto iter = policy->groups.find(enabled_group_id);
+    if (iter != policy->groups.end()) {
+      conf.from_sync_policy_group(store, iter->second);
+    }
+    iter = policy->groups.find(disabled_group_id);
+    if (iter != policy->groups.end()) {
+      conf.from_sync_policy_group(store, iter->second);
+    }
+  }
+
+  if (!op_ret) {
+  s->formatter->open_object_section_in_ns("ReplicationConfiguration", XMLNS_AWS_S3);
+  conf.dump_xml(s->formatter);
+  s->formatter->close_section();
+  rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+}
+
+int RGWPutBucketReplication_ObjStore_S3::get_params(optional_yield y)
+{
+  RGWXMLParser parser;
+
+  if (!parser.init()){
+    return -EINVAL;
+  }
+
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  int r = 0;
+  bufferlist data;
+
+  std::tie(r, data) = read_all_input(s, max_size, false);
+
+  if (r < 0)
+    return r;
+
+  if (!parser.parse(data.c_str(), data.length(), 1)) {
+    return -ERR_MALFORMED_XML;
+  }
+
+  ReplicationConfiguration conf;
+  try {
+    RGWXMLDecoder::decode_xml("ReplicationConfiguration", conf, &parser);
+  } catch (RGWXMLDecoder::err& err) {
+
+    ldpp_dout(this, 5) << "Malformed tagging request: " << err << dendl;
+    return -ERR_MALFORMED_XML;
+  }
+
+  r = conf.to_sync_policy_groups(s, store, &sync_policy_groups);
+  if (r < 0) {
+    return r;
+  }
+
+  // forward requests to meta master zone
+  if (!store->is_meta_master()) {
+    /* only need to keep this data around if we're not meta master */
+    in_data = std::move(data);
+  }
+
+  return 0;
+}
+
+void RGWPutBucketReplication_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
+void RGWDeleteBucketReplication_ObjStore_S3::update_sync_policy(rgw_sync_policy_info *policy)
+{
+  policy->groups.erase(enabled_group_id);
+  policy->groups.erase(disabled_group_id);
+}
+
+void RGWDeleteBucketReplication_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
 void RGWListBuckets_ObjStore_S3::send_response_begin(bool has_buckets)
 {
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
   dump_start(s);
-  end_header(s, NULL, "application/xml");
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, NULL, "application/xml", CHUNKED_TRANSFER_ENCODING);
 
   if (! op_ret) {
     list_all_buckets_start(s);
-    dump_owner(s, s->user->user_id, s->user->display_name);
+    dump_owner(s, s->user->get_id(), s->user->get_display_name());
     s->formatter->open_array_section("Buckets");
     sent_data = true;
   }
 }
 
-void RGWListBuckets_ObjStore_S3::send_response_data(RGWUserBuckets& buckets)
+void RGWListBuckets_ObjStore_S3::send_response_data(rgw::sal::BucketList& buckets)
 {
   if (!sent_data)
     return;
 
-  map<string, RGWBucketEnt>& m = buckets.get_buckets();
-  map<string, RGWBucketEnt>::iterator iter;
+  auto& m = buckets.get_buckets();
 
-  for (iter = m.begin(); iter != m.end(); ++iter) {
-    RGWBucketEnt obj = iter->second;
-    dump_bucket(s, obj);
+  for (auto iter = m.begin(); iter != m.end(); ++iter) {
+    auto& bucket = iter->second;
+    dump_bucket(s, *bucket);
   }
   rgw_flush_formatter(s, s->formatter);
 }
@@ -499,10 +1303,10 @@ void RGWListBuckets_ObjStore_S3::send_response_end()
   }
 }
 
-int RGWGetUsage_ObjStore_S3::get_params()
+int RGWGetUsage_ObjStore_S3::get_params(optional_yield y)
 {
   start_date = s->info.args.get("start-date");
-  end_date = s->info.args.get("end-date"); 
+  end_date = s->info.args.get("end-date");
   return 0;
 }
 
@@ -515,22 +1319,22 @@ static void dump_usage_categories_info(Formatter *formatter, const rgw_usage_log
       continue;
     const rgw_usage_data& usage = uiter->second;
     formatter->open_object_section("Entry");
-    formatter->dump_string("Category", uiter->first);
-    formatter->dump_int("BytesSent", usage.bytes_sent);
-    formatter->dump_int("BytesReceived", usage.bytes_received);
-    formatter->dump_int("Ops", usage.ops);
-    formatter->dump_int("SuccessfulOps", usage.successful_ops);
+    encode_json("Category", uiter->first, formatter);
+    encode_json("BytesSent", usage.bytes_sent, formatter);
+    encode_json("BytesReceived", usage.bytes_received, formatter);
+    encode_json("Ops", usage.ops, formatter);
+    encode_json("SuccessfulOps", usage.successful_ops, formatter);
     formatter->close_section(); // Entry
   }
   formatter->close_section(); // Category
 }
 
-static void dump_usage_bucket_info(Formatter *formatter, const std::string& name, const cls_user_bucket_entry& entry)
+static void dump_usage_bucket_info(Formatter *formatter, const std::string& name, const bucket_meta_entry& entry)
 {
   formatter->open_object_section("Entry");
-  formatter->dump_string("Bucket", name);
-  formatter->dump_int("Bytes", entry.size);
-  formatter->dump_int("Bytes_Rounded", entry.size_rounded);
+  encode_json("Bucket", name, formatter);
+  encode_json("Bytes", entry.size, formatter);
+  encode_json("Bytes_Rounded", entry.size_rounded, formatter);
   formatter->close_section(); // entry
 }
 
@@ -540,7 +1344,9 @@ void RGWGetUsage_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   dump_errno(s);
 
-  end_header(s, this, "application/xml");
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
   if (op_ret < 0)
     return;
@@ -548,7 +1354,7 @@ void RGWGetUsage_ObjStore_S3::send_response()
   Formatter *formatter = s->formatter;
   string last_owner;
   bool user_section_open = false;
-  
+
   formatter->open_object_section("Usage");
   if (show_log_entries) {
     formatter->open_array_section("Entries");
@@ -601,10 +1407,10 @@ void RGWGetUsage_ObjStore_S3::send_response()
        rgw_usage_data total_usage;
        entry.sum(total_usage, categories);
        formatter->open_object_section("Total");
-       formatter->dump_int("BytesSent", total_usage.bytes_sent);
-       formatter->dump_int("BytesReceived", total_usage.bytes_received);
-       formatter->dump_int("Ops", total_usage.ops);
-       formatter->dump_int("SuccessfulOps", total_usage.successful_ops);
+       encode_json("BytesSent", total_usage.bytes_sent, formatter);
+       encode_json("BytesReceived", total_usage.bytes_received, formatter);
+       encode_json("Ops", total_usage.ops, formatter);
+       encode_json("SuccessfulOps", total_usage.successful_ops, formatter);
        formatter->close_section(); // total
        formatter->close_section(); // user
      }
@@ -613,9 +1419,17 @@ void RGWGetUsage_ObjStore_S3::send_response()
        formatter->open_object_section("Stats");
      }
 
-     formatter->dump_int("TotalBytes", header.stats.total_bytes);
-     formatter->dump_int("TotalBytesRounded", header.stats.total_bytes_rounded);
-     formatter->dump_int("TotalEntries", header.stats.total_entries);
+     // send info about quota config
+     auto user_info = s->user->get_info();
+     encode_json("QuotaMaxBytes", user_info.user_quota.max_size, formatter);
+     encode_json("QuotaMaxBuckets", user_info.max_buckets, formatter);
+     encode_json("QuotaMaxObjCount", user_info.user_quota.max_objects, formatter);
+     encode_json("QuotaMaxBytesPerBucket", user_info.bucket_quota.max_objects, formatter);
+     encode_json("QuotaMaxObjCountPerBucket", user_info.bucket_quota.max_size, formatter);
+     // send info about user's capacity utilization
+     encode_json("TotalBytes", stats.size, formatter);
+     encode_json("TotalBytesRounded", stats.size_rounded, formatter);
+     encode_json("TotalEntries", stats.num_objects, formatter);
 
      if (s->cct->_conf->rgw_rest_getusage_op_compat) {
        formatter->close_section(); //Stats
@@ -628,72 +1442,292 @@ void RGWGetUsage_ObjStore_S3::send_response()
   formatter->open_object_section("User");
   formatter->open_array_section("Buckets");
   for (const auto& biter : buckets_usage) {
-    const cls_user_bucket_entry& entry = biter.second;
+    const bucket_meta_entry& entry = biter.second;
     dump_usage_bucket_info(formatter, biter.first, entry);
   }
   formatter->close_section(); // Buckets
   formatter->close_section(); // User
   formatter->close_section(); // CapacityUsed
 
-  formatter->close_section(); // usage
-  rgw_flush_formatter_and_reset(s, s->formatter);
-}
+  formatter->close_section(); // usage
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+int RGWListBucket_ObjStore_S3::get_common_params()
+{
+  list_versions = s->info.args.exists("versions");
+  prefix = s->info.args.get("prefix");
+
+  // non-standard
+  s->info.args.get_bool("allow-unordered", &allow_unordered, false);
+  delimiter = s->info.args.get("delimiter");
+  max_keys = s->info.args.get("max-keys");
+  op_ret = parse_max_keys();
+  if (op_ret < 0) {
+   return op_ret;
+  }
+  encoding_type = s->info.args.get("encoding-type");
+  if (s->system_request) {
+    s->info.args.get_bool("objs-container", &objs_container, false);
+    const char *shard_id_str = s->info.env->get("HTTP_RGWX_SHARD_ID");
+    if (shard_id_str) {
+      string err;
+      shard_id = strict_strtol(shard_id_str, 10, &err);
+      if (!err.empty()) {
+        ldpp_dout(this, 5) << "bad shard id specified: " << shard_id_str << dendl;
+        return -EINVAL;
+      }
+    } else {
+     shard_id = s->bucket_instance_shard_id;
+    }
+  }
+  return 0;
+}
+
+int RGWListBucket_ObjStore_S3::get_params(optional_yield y)
+{
+  int ret = get_common_params();
+  if (ret < 0) {
+    return ret;
+  }
+  if (!list_versions) {
+    marker = s->info.args.get("marker");
+  } else {
+    marker.name = s->info.args.get("key-marker");
+    marker.instance = s->info.args.get("version-id-marker");
+  }
+  return 0;
+}
+
+int RGWListBucket_ObjStore_S3v2::get_params(optional_yield y)
+{
+int ret = get_common_params();
+if (ret < 0) {
+  return ret;
+}
+s->info.args.get_bool("fetch-owner", &fetchOwner, false);
+startAfter = s->info.args.get("start-after", &start_after_exist);
+continuation_token = s->info.args.get("continuation-token", &continuation_token_exist);
+if(!continuation_token_exist) {
+  marker = startAfter;
+} else {
+  marker = continuation_token;
+}
+return 0;
+}
+
+void RGWListBucket_ObjStore_S3::send_common_versioned_response()
+{
+  if (!s->bucket_tenant.empty()) {
+    s->formatter->dump_string("Tenant", s->bucket_tenant);
+  }
+  s->formatter->dump_string("Name", s->bucket_name);
+  s->formatter->dump_string("Prefix", prefix);
+  s->formatter->dump_int("MaxKeys", max);
+  if (!delimiter.empty()) {
+    s->formatter->dump_string("Delimiter", delimiter);
+  }
+  s->formatter->dump_string("IsTruncated", (max && is_truncated ? "true"
+              : "false"));
+
+  if (!common_prefixes.empty()) {
+      map<string, bool>::iterator pref_iter;
+      for (pref_iter = common_prefixes.begin();
+      pref_iter != common_prefixes.end(); ++pref_iter) {
+      s->formatter->open_array_section("CommonPrefixes");
+      if (encode_key) {
+        s->formatter->dump_string("Prefix", url_encode(pref_iter->first, false));
+      } else {
+        s->formatter->dump_string("Prefix", pref_iter->first);
+      }
+
+      s->formatter->close_section();
+      }
+    }
+  }
+
+void RGWListBucket_ObjStore_S3::send_versioned_response()
+{
+  s->formatter->open_object_section_in_ns("ListVersionsResult", XMLNS_AWS_S3);
+  if (strcasecmp(encoding_type.c_str(), "url") == 0) {
+    s->formatter->dump_string("EncodingType", "url");
+    encode_key = true;
+  }
+  RGWListBucket_ObjStore_S3::send_common_versioned_response();
+  s->formatter->dump_string("KeyMarker", marker.name);
+  s->formatter->dump_string("VersionIdMarker", marker.instance);
+  if (is_truncated && !next_marker.empty()) {
+    s->formatter->dump_string("NextKeyMarker", next_marker.name);
+    if (next_marker.instance.empty()) {
+      s->formatter->dump_string("NextVersionIdMarker", "null");
+    }
+    else {
+      s->formatter->dump_string("NextVersionIdMarker", next_marker.instance);
+    }
+  }
+
+  if (op_ret >= 0) {
+    if (objs_container) {
+      s->formatter->open_array_section("Entries");
+    }
+
+    vector<rgw_bucket_dir_entry>::iterator iter;
+    for (iter = objs.begin(); iter != objs.end(); ++iter) {
+      const char *section_name = (iter->is_delete_marker() ? "DeleteMarker"
+          : "Version");
+      s->formatter->open_object_section(section_name);
+      if (objs_container) {
+        s->formatter->dump_bool("IsDeleteMarker", iter->is_delete_marker());
+      }
+      rgw_obj_key key(iter->key);
+      if (encode_key) {
+        string key_name;
+        url_encode(key.name, key_name);
+        s->formatter->dump_string("Key", key_name);
+      }
+      else {
+        s->formatter->dump_string("Key", key.name);
+      }
+      string version_id = key.instance;
+      if (version_id.empty()) {
+        version_id = "null";
+      }
+      if (s->system_request) {
+        if (iter->versioned_epoch > 0) {
+          s->formatter->dump_int("VersionedEpoch", iter->versioned_epoch);
+        }
+        s->formatter->dump_string("RgwxTag", iter->tag);
+        utime_t ut(iter->meta.mtime);
+        ut.gmtime_nsec(s->formatter->dump_stream("RgwxMtime"));
+      }
+      s->formatter->dump_string("VersionId", version_id);
+      s->formatter->dump_bool("IsLatest", iter->is_current());
+      dump_time(s, "LastModified", iter->meta.mtime);
+      if (!iter->is_delete_marker()) {
+        s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
+        s->formatter->dump_int("Size", iter->meta.accounted_size);
+        auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
+        s->formatter->dump_string("StorageClass", storage_class.c_str());
+      }
+      dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
+      if (iter->meta.appendable) {
+        s->formatter->dump_string("Type", "Appendable");
+      } else {
+        s->formatter->dump_string("Type", "Normal");
+      }
+      s->formatter->close_section(); // Version/DeleteMarker
+    }
+    if (objs_container) {
+      s->formatter->close_section(); // Entries
+    }
+    s->formatter->close_section(); // ListVersionsResult
+  }
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+
+void RGWListBucket_ObjStore_S3::send_common_response()
+{
+  if (!s->bucket_tenant.empty()) {
+    s->formatter->dump_string("Tenant", s->bucket_tenant);
+  }
+  s->formatter->dump_string("Name", s->bucket_name);
+  s->formatter->dump_string("Prefix", prefix);
+  s->formatter->dump_int("MaxKeys", max);
+  if (!delimiter.empty()) {
+    s->formatter->dump_string("Delimiter", delimiter);
+  }
+  s->formatter->dump_string("IsTruncated", (max && is_truncated ? "true"
+              : "false"));
+
+    if (!common_prefixes.empty()) {
+      map<string, bool>::iterator pref_iter;
+      for (pref_iter = common_prefixes.begin();
+      pref_iter != common_prefixes.end(); ++pref_iter) {
+      s->formatter->open_array_section("CommonPrefixes");
+      if (encode_key) {
+        s->formatter->dump_string("Prefix", url_encode(pref_iter->first, false));
+      } else {
+        s->formatter->dump_string("Prefix", pref_iter->first);
+      }
+      s->formatter->close_section();
+      }
+    }
+  }
 
-int RGWListBucket_ObjStore_S3::get_params()
+void RGWListBucket_ObjStore_S3::send_response()
 {
-  list_versions = s->info.args.exists("versions");
-  prefix = s->info.args.get("prefix");
-  if (!list_versions) {
-    marker = s->info.args.get("marker");
-  } else {
-    marker.name = s->info.args.get("key-marker");
-    marker.instance = s->info.args.get("version-id-marker");
+  if (op_ret < 0) {
+    set_req_state_err(s, op_ret);
   }
-  max_keys = s->info.args.get("max-keys");
-  op_ret = parse_max_keys();
+  dump_errno(s);
+
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  dump_start(s);
   if (op_ret < 0) {
-    return op_ret;
+    return;
   }
-  delimiter = s->info.args.get("delimiter");
-  encoding_type = s->info.args.get("encoding-type");
-  if (s->system_request) {
-    s->info.args.get_bool("objs-container", &objs_container, false);
-    const char *shard_id_str = s->info.env->get("HTTP_RGWX_SHARD_ID");
-    if (shard_id_str) {
-      string err;
-      shard_id = strict_strtol(shard_id_str, 10, &err);
-      if (!err.empty()) {
-        ldout(s->cct, 5) << "bad shard id specified: " << shard_id_str << dendl;
-        return -EINVAL;
+  if (list_versions) {
+    send_versioned_response();
+    return;
+  }
+
+  s->formatter->open_object_section_in_ns("ListBucketResult", XMLNS_AWS_S3);
+  if (strcasecmp(encoding_type.c_str(), "url") == 0) {
+    s->formatter->dump_string("EncodingType", "url");
+    encode_key = true;
+  }
+  RGWListBucket_ObjStore_S3::send_common_response();
+    if (op_ret >= 0) {
+      vector<rgw_bucket_dir_entry>::iterator iter;
+      for (iter = objs.begin(); iter != objs.end(); ++iter) {
+        rgw_obj_key key(iter->key);
+        s->formatter->open_array_section("Contents");
+        if (encode_key) {
+          string key_name;
+          url_encode(key.name, key_name);
+          s->formatter->dump_string("Key", key_name);
+      } else {
+          s->formatter->dump_string("Key", key.name);
       }
-    } else {
-      shard_id = s->bucket_instance_shard_id;
+        dump_time(s, "LastModified", iter->meta.mtime);
+        s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
+        s->formatter->dump_int("Size", iter->meta.accounted_size);
+        auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
+        s->formatter->dump_string("StorageClass", storage_class.c_str());
+        dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
+        if (s->system_request) {
+          s->formatter->dump_string("RgwxTag", iter->tag);
+      }
+        if (iter->meta.appendable) {
+          s->formatter->dump_string("Type", "Appendable");
+      } else {
+          s->formatter->dump_string("Type", "Normal");
+      }
+        s->formatter->close_section();
     }
   }
-  return 0;
+    s->formatter->dump_string("Marker", marker.name);
+    if (is_truncated && !next_marker.empty()) {
+      s->formatter->dump_string("NextMarker", next_marker.name);
+    }
+    s->formatter->close_section();
+    rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-void RGWListBucket_ObjStore_S3::send_versioned_response()
+void RGWListBucket_ObjStore_S3v2::send_versioned_response()
 {
   s->formatter->open_object_section_in_ns("ListVersionsResult", XMLNS_AWS_S3);
-  if (!s->bucket_tenant.empty())
-    s->formatter->dump_string("Tenant", s->bucket_tenant);
-  s->formatter->dump_string("Name", s->bucket_name);
-  s->formatter->dump_string("Prefix", prefix);
-  s->formatter->dump_string("KeyMarker", marker.name);
-  s->formatter->dump_string("VersionIdMarker", marker.instance);
+  RGWListBucket_ObjStore_S3v2::send_common_versioned_response();
+  s->formatter->dump_string("KeyContinuationToken", marker.name);
+  s->formatter->dump_string("VersionIdContinuationToken", marker.instance);
   if (is_truncated && !next_marker.empty()) {
-    s->formatter->dump_string("NextKeyMarker", next_marker.name);
-    s->formatter->dump_string("NextVersionIdMarker", next_marker.instance);
+    s->formatter->dump_string("NextKeyContinuationToken", next_marker.name);
+    s->formatter->dump_string("NextVersionIdContinuationToken", next_marker.instance);
   }
-  s->formatter->dump_int("MaxKeys", max);
-  if (!delimiter.empty())
-    s->formatter->dump_string("Delimiter", delimiter);
-
-  s->formatter->dump_string("IsTruncated", (max && is_truncated ? "true"
-                                           : "false"));
 
-  bool encode_key = false;
   if (strcasecmp(encoding_type.c_str(), "url") == 0) {
     s->formatter->dump_string("EncodingType", "url");
     encode_key = true;
@@ -706,23 +1740,24 @@ void RGWListBucket_ObjStore_S3::send_versioned_response()
 
     vector<rgw_bucket_dir_entry>::iterator iter;
     for (iter = objs.begin(); iter != objs.end(); ++iter) {
-      const char *section_name = (iter->is_delete_marker() ? "DeleteMarker"
-                                 : "Version");
+      const char *section_name = (iter->is_delete_marker() ? "DeleteContinuationToken"
+          : "Version");
       s->formatter->open_object_section(section_name);
       if (objs_container) {
-        s->formatter->dump_bool("IsDeleteMarker", iter->is_delete_marker());
+        s->formatter->dump_bool("IsDeleteContinuationToken", iter->is_delete_marker());
       }
       rgw_obj_key key(iter->key);
       if (encode_key) {
-       string key_name;
-       url_encode(key.name, key_name);
-       s->formatter->dump_string("Key", key_name);
-      } else {
-       s->formatter->dump_string("Key", key.name);
+        string key_name;
+        url_encode(key.name, key_name);
+        s->formatter->dump_string("Key", key_name);
+      }
+      else {
+        s->formatter->dump_string("Key", key.name);
       }
       string version_id = key.instance;
       if (version_id.empty()) {
-       version_id = "null";
+        version_id = "null";
       }
       if (s->system_request) {
         if (iter->versioned_epoch > 0) {
@@ -734,102 +1769,116 @@ void RGWListBucket_ObjStore_S3::send_versioned_response()
       }
       s->formatter->dump_string("VersionId", version_id);
       s->formatter->dump_bool("IsLatest", iter->is_current());
-      dump_time(s, "LastModified", &iter->meta.mtime);
+      dump_time(s, "LastModified", iter->meta.mtime);
       if (!iter->is_delete_marker()) {
-       s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
-       s->formatter->dump_int("Size", iter->meta.accounted_size);
-       s->formatter->dump_string("StorageClass", "STANDARD");
+        s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
+        s->formatter->dump_int("Size", iter->meta.accounted_size);
+        auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
+        s->formatter->dump_string("StorageClass", storage_class.c_str());
+      }
+      if (fetchOwner == true) {
+        dump_owner(s, s->user->get_id(), s->user->get_display_name());
       }
-      dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
       s->formatter->close_section();
     }
+
+
     if (objs_container) {
       s->formatter->close_section();
     }
 
-    if (!common_prefixes.empty()) {
+     if (!common_prefixes.empty()) {
       map<string, bool>::iterator pref_iter;
       for (pref_iter = common_prefixes.begin();
-          pref_iter != common_prefixes.end(); ++pref_iter) {
-       s->formatter->open_array_section("CommonPrefixes");
-       s->formatter->dump_string("Prefix", pref_iter->first);
-       s->formatter->close_section();
+      pref_iter != common_prefixes.end(); ++pref_iter) {
+      s->formatter->open_array_section("CommonPrefixes");
+      if (encode_key) {
+        s->formatter->dump_string("Prefix", url_encode(pref_iter->first, false));
+      } else {
+        s->formatter->dump_string("Prefix", pref_iter->first);
+      }
+
+      s->formatter->dump_int("KeyCount",objs.size());
+      if (start_after_exist) {
+        s->formatter->dump_string("StartAfter", startAfter);
+      }
+      s->formatter->close_section();
       }
     }
+
+    s->formatter->close_section();
+    rgw_flush_formatter_and_reset(s, s->formatter);
   }
-  s->formatter->close_section();
-  rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-void RGWListBucket_ObjStore_S3::send_response()
+void RGWListBucket_ObjStore_S3v2::send_response()
 {
-  if (op_ret < 0)
+  if (op_ret < 0) {
     set_req_state_err(s, op_ret);
+  }
   dump_errno(s);
 
-  end_header(s, this, "application/xml");
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
-  if (op_ret < 0)
+  if (op_ret < 0) {
     return;
-
+  }
   if (list_versions) {
     send_versioned_response();
     return;
   }
 
   s->formatter->open_object_section_in_ns("ListBucketResult", XMLNS_AWS_S3);
-  if (!s->bucket_tenant.empty())
-    s->formatter->dump_string("Tenant", s->bucket_tenant);
-  s->formatter->dump_string("Name", s->bucket_name);
-  s->formatter->dump_string("Prefix", prefix);
-  s->formatter->dump_string("Marker", marker.name);
-  if (is_truncated && !next_marker.empty())
-    s->formatter->dump_string("NextMarker", next_marker.name);
-  s->formatter->dump_int("MaxKeys", max);
-  if (!delimiter.empty())
-    s->formatter->dump_string("Delimiter", delimiter);
-
-  s->formatter->dump_string("IsTruncated", (max && is_truncated ? "true"
-                                           : "false"));
-
-  bool encode_key = false;
   if (strcasecmp(encoding_type.c_str(), "url") == 0) {
     s->formatter->dump_string("EncodingType", "url");
     encode_key = true;
   }
 
+  RGWListBucket_ObjStore_S3::send_common_response();
   if (op_ret >= 0) {
     vector<rgw_bucket_dir_entry>::iterator iter;
     for (iter = objs.begin(); iter != objs.end(); ++iter) {
       rgw_obj_key key(iter->key);
       s->formatter->open_array_section("Contents");
       if (encode_key) {
-       string key_name;
-       url_encode(key.name, key_name);
-       s->formatter->dump_string("Key", key_name);
-      } else {
-       s->formatter->dump_string("Key", key.name);
+        string key_name;
+        url_encode(key.name, key_name);
+        s->formatter->dump_string("Key", key_name);
+      }
+      else {
+        s->formatter->dump_string("Key", key.name);
       }
-      dump_time(s, "LastModified", &iter->meta.mtime);
+      dump_time(s, "LastModified", iter->meta.mtime);
       s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
       s->formatter->dump_int("Size", iter->meta.accounted_size);
-      s->formatter->dump_string("StorageClass", "STANDARD");
-      dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
+      auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
+      s->formatter->dump_string("StorageClass", storage_class.c_str());
+      if (fetchOwner == true) {
+        dump_owner(s, s->user->get_id(), s->user->get_display_name());
+      }
       if (s->system_request) {
         s->formatter->dump_string("RgwxTag", iter->tag);
       }
-      s->formatter->close_section();
-    }
-    if (!common_prefixes.empty()) {
-      map<string, bool>::iterator pref_iter;
-      for (pref_iter = common_prefixes.begin();
-          pref_iter != common_prefixes.end(); ++pref_iter) {
-       s->formatter->open_array_section("CommonPrefixes");
-       s->formatter->dump_string("Prefix", pref_iter->first);
-       s->formatter->close_section();
+      if (iter->meta.appendable) {
+        s->formatter->dump_string("Type", "Appendable");
+      } else {
+        s->formatter->dump_string("Type", "Normal");
       }
+      s->formatter->close_section();
     }
   }
+  if (continuation_token_exist) {
+    s->formatter->dump_string("ContinuationToken", continuation_token);
+  }
+  if (is_truncated && !next_marker.empty()) {
+    s->formatter->dump_string("NextContinuationToken", next_marker.name);
+  }
+  s->formatter->dump_int("KeyCount", objs.size() + common_prefixes.size());
+  if (start_after_exist) {
+    s->formatter->dump_string("StartAfter", startAfter);
+  }
   s->formatter->close_section();
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
@@ -854,12 +1903,12 @@ void RGWGetBucketLocation_ObjStore_S3::send_response()
   RGWZoneGroup zonegroup;
   string api_name;
 
-  int ret = store->get_zonegroup(s->bucket_info.zonegroup, zonegroup);
+  int ret = store->get_zone()->get_zonegroup(s->bucket->get_info().zonegroup, zonegroup);
   if (ret >= 0) {
     api_name = zonegroup.api_name;
   } else  {
-    if (s->bucket_info.zonegroup != "default") {
-      api_name = s->bucket_info.zonegroup;
+    if (s->bucket->get_info().zonegroup != "default") {
+      api_name = s->bucket->get_info().zonegroup;
     }
   }
 
@@ -870,6 +1919,8 @@ void RGWGetBucketLocation_ObjStore_S3::send_response()
 
 void RGWGetBucketVersioning_ObjStore_S3::send_response()
 {
+  if (op_ret)
+    set_req_state_err(s, op_ret);
   dump_errno(s);
   end_header(s, this, "application/xml");
   dump_start(s);
@@ -878,82 +1929,102 @@ void RGWGetBucketVersioning_ObjStore_S3::send_response()
   if (versioned) {
     const char *status = (versioning_enabled ? "Enabled" : "Suspended");
     s->formatter->dump_string("Status", status);
+    const char *mfa_status = (mfa_enabled ? "Enabled" : "Disabled");
+    s->formatter->dump_string("MfaDelete", mfa_status);
   }
   s->formatter->close_section();
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-class RGWSetBucketVersioningParser : public RGWXMLParser
-{
-  XMLObj *alloc_obj(const char *el) override {
-    return new XMLObj;
-  }
-
-public:
-  RGWSetBucketVersioningParser() {}
-  ~RGWSetBucketVersioningParser() override {}
-
-  int get_versioning_status(bool *status) {
-    XMLObj *config = find_first("VersioningConfiguration");
-    if (!config)
-      return -EINVAL;
-
-    *status = false;
-
-    XMLObj *field = config->find_first("Status");
-    if (!field)
-      return 0;
+struct ver_config_status {
+  int status{VersioningSuspended};
+
+  enum MFAStatus {
+    MFA_UNKNOWN,
+    MFA_DISABLED,
+    MFA_ENABLED,
+  } mfa_status{MFA_UNKNOWN};
+  int retcode{0};
+
+  void decode_xml(XMLObj *obj) {
+    string status_str;
+    string mfa_str;
+    RGWXMLDecoder::decode_xml("Status", status_str, obj);
+    if (status_str == "Enabled") {
+      status = VersioningEnabled;
+    } else if (status_str != "Suspended") {
+      status = VersioningStatusInvalid;
+    }
 
-    string& s = field->get_data();
 
-    if (stringcasecmp(s, "Enabled") == 0) {
-      *status = true;
-    } else if (stringcasecmp(s, "Suspended") != 0) {
-      return -EINVAL;
+    if (RGWXMLDecoder::decode_xml("MfaDelete", mfa_str, obj)) {
+      if (mfa_str == "Enabled") {
+        mfa_status = MFA_ENABLED;
+      } else if (mfa_str == "Disabled") {
+        mfa_status = MFA_DISABLED;
+      } else {
+        retcode = -EINVAL;
+      }
     }
-
-    return 0;
   }
 };
 
-int RGWSetBucketVersioning_ObjStore_S3::get_params()
+int RGWSetBucketVersioning_ObjStore_S3::get_params(optional_yield y)
 {
-  char *data = nullptr;
-  int len = 0;
-  int r =
-    rgw_rest_read_all_input(s, &data, &len, s->cct->_conf->rgw_max_put_param_size, false);
-  if (r < 0) {
-    return r;
-  }
-  
-  auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
-
-  r = do_aws4_auth_completion();
+  int r = 0;
+  bufferlist data;
+  std::tie(r, data) =
+    read_all_input(s, s->cct->_conf->rgw_max_put_param_size, false);
   if (r < 0) {
     return r;
   }
 
-  RGWSetBucketVersioningParser parser;
-
+  RGWXMLDecoder::XMLParser parser;
   if (!parser.init()) {
-    ldout(s->cct, 0) << "ERROR: failed to initialize parser" << dendl;
-    r = -EIO;
-    return r;
+    ldpp_dout(this, 0) << "ERROR: failed to initialize parser" << dendl;
+    return -EIO;
   }
 
-  if (!parser.parse(data, len, 1)) {
-    ldout(s->cct, 10) << "failed to parse data: " << data << dendl;
+  char* buf = data.c_str();
+  if (!parser.parse(buf, data.length(), 1)) {
+    ldpp_dout(this, 10) << "NOTICE: failed to parse data: " << buf << dendl;
     r = -EINVAL;
     return r;
   }
 
+  ver_config_status status_conf;
+
+  if (!RGWXMLDecoder::decode_xml("VersioningConfiguration", status_conf, &parser)) {
+    ldpp_dout(this, 10) << "NOTICE: bad versioning config input" << dendl;
+    return -EINVAL;
+  }
+
   if (!store->is_meta_master()) {
     /* only need to keep this data around if we're not meta master */
-    in_data.append(data, len);
+    in_data.append(data);
   }
 
-  r = parser.get_versioning_status(&enable_versioning);
-  
+  versioning_status = status_conf.status;
+  if (versioning_status == VersioningStatusInvalid) {
+    r = -EINVAL;
+  }
+
+  if (status_conf.mfa_status != ver_config_status::MFA_UNKNOWN) {
+    mfa_set_status = true;
+    switch (status_conf.mfa_status) {
+      case ver_config_status::MFA_DISABLED:
+        mfa_status = false;
+        break;
+      case ver_config_status::MFA_ENABLED:
+        mfa_status = true;
+        break;
+      default:
+        ldpp_dout(this, 0) << "ERROR: RGWSetBucketVersioning_ObjStore_S3::get_params(optional_yield y): unexpected switch case mfa_status=" << status_conf.mfa_status << dendl;
+        r = -EIO;
+    }
+  } else if (status_conf.retcode < 0) {
+    r = status_conf.retcode;
+  }
   return r;
 }
 
@@ -962,50 +2033,75 @@ void RGWSetBucketVersioning_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s);
+  end_header(s, this, "application/xml");
 }
 
-int RGWSetBucketWebsite_ObjStore_S3::get_params()
+int RGWSetBucketWebsite_ObjStore_S3::get_params(optional_yield y)
 {
-  char *data = nullptr;
-  int len = 0;
   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-  int r = rgw_rest_read_all_input(s, &data, &len, max_size, false);
-
-  if (r < 0) {
-    return r;
-  }
 
-  auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
+  int r = 0;
+  bufferlist data;
+  std::tie(r, data) = read_all_input(s, max_size, false);
 
-  r = do_aws4_auth_completion();
   if (r < 0) {
     return r;
   }
 
-  bufferptr in_ptr(data, len);
-  in_data.append(in_ptr);
+  in_data.append(data);
 
   RGWXMLDecoder::XMLParser parser;
   if (!parser.init()) {
-    ldout(s->cct, 0) << "ERROR: failed to initialize parser" << dendl;
+    ldpp_dout(this, 0) << "ERROR: failed to initialize parser" << dendl;
     return -EIO;
   }
 
-  if (!parser.parse(data, len, 1)) {
-    string str(data, len);
-    ldout(s->cct, 5) << "failed to parse xml: " << str << dendl;
+  char* buf = data.c_str();
+  if (!parser.parse(buf, data.length(), 1)) {
+    ldpp_dout(this, 5) << "failed to parse xml: " << buf << dendl;
     return -EINVAL;
   }
 
   try {
     RGWXMLDecoder::decode_xml("WebsiteConfiguration", website_conf, &parser, true);
   } catch (RGWXMLDecoder::err& err) {
-    string str(data, len);
-    ldout(s->cct, 5) << "unexpected xml: " << str << dendl;
+    ldpp_dout(this, 5) << "unexpected xml: " << buf << dendl;
     return -EINVAL;
   }
 
+  if (website_conf.is_redirect_all && website_conf.redirect_all.hostname.empty()) {
+    s->err.message = "A host name must be provided to redirect all requests (e.g. \"example.com\").";
+    ldpp_dout(this, 5) << s->err.message << dendl;
+    return -EINVAL;
+  } else if (!website_conf.is_redirect_all && !website_conf.is_set_index_doc) {
+    s->err.message = "A value for IndexDocument Suffix must be provided if RedirectAllRequestsTo is empty";
+    ldpp_dout(this, 5) << s->err.message << dendl;
+    return -EINVAL;
+  } else if (!website_conf.is_redirect_all && website_conf.is_set_index_doc &&
+             website_conf.index_doc_suffix.empty()) {
+    s->err.message = "The IndexDocument Suffix is not well formed";
+    ldpp_dout(this, 5) << s->err.message << dendl;
+    return -EINVAL;
+  }
+
+#define WEBSITE_ROUTING_RULES_MAX_NUM      50
+  int max_num = s->cct->_conf->rgw_website_routing_rules_max_num;
+  if (max_num < 0) {
+    max_num = WEBSITE_ROUTING_RULES_MAX_NUM;
+  }
+  int routing_rules_num = website_conf.routing_rules.rules.size();
+  if (routing_rules_num > max_num) {
+    ldpp_dout(this, 4) << "An website routing config can have up to "
+                     << max_num
+                     << " rules, request website routing rules num: "
+                     << routing_rules_num << dendl;
+    op_ret = -ERR_INVALID_WEBSITE_ROUTING_RULES_ERROR;
+    s->err.message = std::to_string(routing_rules_num) +" routing rules provided, the number of routing rules in a website configuration is limited to "
+                     + std::to_string(max_num)
+                     + ".";
+    return -ERR_INVALID_REQUEST;
+  }
+
   return 0;
 }
 
@@ -1014,7 +2110,7 @@ void RGWSetBucketWebsite_ObjStore_S3::send_response()
   if (op_ret < 0)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s);
+  end_header(s, this, "application/xml");
 }
 
 void RGWDeleteBucketWebsite_ObjStore_S3::send_response()
@@ -1024,7 +2120,7 @@ void RGWDeleteBucketWebsite_ObjStore_S3::send_response()
   }
   set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s);
+  end_header(s, this, "application/xml");
 }
 
 void RGWGetBucketWebsite_ObjStore_S3::send_response()
@@ -1039,7 +2135,7 @@ void RGWGetBucketWebsite_ObjStore_S3::send_response()
     return;
   }
 
-  RGWBucketWebsiteConf& conf = s->bucket_info.website_conf;
+  RGWBucketWebsiteConf& conf = s->bucket->get_info().website_conf;
 
   s->formatter->open_object_section_in_ns("WebsiteConfiguration", XMLNS_AWS_S3);
   conf.dump_xml(s->formatter);
@@ -1047,16 +2143,25 @@ void RGWGetBucketWebsite_ObjStore_S3::send_response()
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-static void dump_bucket_metadata(struct req_state *s, RGWBucketEnt& bucket)
+static void dump_bucket_metadata(struct req_state *s, rgw::sal::Bucket* bucket)
 {
-  dump_header(s, "X-RGW-Object-Count", static_cast<long long>(bucket.count));
-  dump_header(s, "X-RGW-Bytes-Used", static_cast<long long>(bucket.size));
+  dump_header(s, "X-RGW-Object-Count", static_cast<long long>(bucket->get_count()));
+  dump_header(s, "X-RGW-Bytes-Used", static_cast<long long>(bucket->get_size()));
+  // only bucket's owner is allowed to get the quota settings of the account
+  if (bucket->is_owner(s->user.get())) {
+    auto user_info = s->user->get_info();
+    dump_header(s, "X-RGW-Quota-User-Size", static_cast<long long>(user_info.user_quota.max_size));
+    dump_header(s, "X-RGW-Quota-User-Objects", static_cast<long long>(user_info.user_quota.max_objects));
+    dump_header(s, "X-RGW-Quota-Max-Buckets", static_cast<long long>(user_info.max_buckets));
+    dump_header(s, "X-RGW-Quota-Bucket-Size", static_cast<long long>(user_info.bucket_quota.max_size));
+    dump_header(s, "X-RGW-Quota-Bucket-Objects", static_cast<long long>(user_info.bucket_quota.max_objects));
+  }
 }
 
 void RGWStatBucket_ObjStore_S3::send_response()
 {
   if (op_ret >= 0) {
-    dump_bucket_metadata(s, bucket);
+    dump_bucket_metadata(s, bucket.get());
   }
 
   set_req_state_err(s, op_ret);
@@ -1066,7 +2171,7 @@ void RGWStatBucket_ObjStore_S3::send_response()
   dump_start(s);
 }
 
-static int create_s3_policy(struct req_state *s, RGWRados *store,
+static int create_s3_policy(struct req_state *s, rgw::sal::Store* store,
                            RGWAccessControlPolicy_S3& s3policy,
                            ACLOwner& owner)
 {
@@ -1074,7 +2179,7 @@ static int create_s3_policy(struct req_state *s, RGWRados *store,
     if (!s->canned_acl.empty())
       return -ERR_INVALID_REQUEST;
 
-    return s3policy.create_from_headers(store, s->info.env, owner);
+    return s3policy.create_from_headers(s, store, s->info.env, owner);
   }
 
   return s3policy.create_canned(owner, s->bucket_owner, s->canned_acl);
@@ -1129,66 +2234,74 @@ public:
   }
 };
 
-int RGWCreateBucket_ObjStore_S3::get_params()
+int RGWCreateBucket_ObjStore_S3::get_params(optional_yield y)
 {
   RGWAccessControlPolicy_S3 s3policy(s->cct);
+  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
 
-  int r = create_s3_policy(s, store, s3policy, s->owner);
+  int r;
+  if (!s->system_request) {
+    r = valid_s3_bucket_name(s->bucket_name, relaxed_names);
+    if (r) return r;
+  }
+
+  r = create_s3_policy(s, store, s3policy, s->owner);
   if (r < 0)
     return r;
 
   policy = s3policy;
 
-  int len = 0;
-  char *data = nullptr;
-
   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-  op_ret = rgw_rest_read_all_input(s, &data, &len, max_size, false);
+
+  int op_ret = 0;
+  bufferlist data;
+  std::tie(op_ret, data) = read_all_input(s, max_size, false);
 
   if ((op_ret < 0) && (op_ret != -ERR_LENGTH_REQUIRED))
     return op_ret;
 
-  auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
+  in_data.append(data);
 
-  const int auth_ret = do_aws4_auth_completion();
-  if (auth_ret < 0) {
-    return auth_ret;
-  }
-  
-  bufferptr in_ptr(data, len);
-  in_data.append(in_ptr);
-
-  if (len) {
+  if (data.length()) {
     RGWCreateBucketParser parser;
 
     if (!parser.init()) {
-      ldout(s->cct, 0) << "ERROR: failed to initialize parser" << dendl;
+      ldpp_dout(this, 0) << "ERROR: failed to initialize parser" << dendl;
       return -EIO;
     }
 
-    bool success = parser.parse(data, len, 1);
-    ldout(s->cct, 20) << "create bucket input data=" << data << dendl;
+    char* buf = data.c_str();
+    bool success = parser.parse(buf, data.length(), 1);
+    ldpp_dout(this, 20) << "create bucket input data=" << buf << dendl;
 
     if (!success) {
-      ldout(s->cct, 0) << "failed to parse input: " << data << dendl;
+      ldpp_dout(this, 0) << "failed to parse input: " << buf << dendl;
       return -EINVAL;
     }
 
     if (!parser.get_location_constraint(location_constraint)) {
-      ldout(s->cct, 0) << "provided input did not specify location constraint correctly" << dendl;
+      ldpp_dout(this, 0) << "provided input did not specify location constraint correctly" << dendl;
       return -EINVAL;
     }
 
-    ldout(s->cct, 10) << "create bucket location constraint: "
+    ldpp_dout(this, 10) << "create bucket location constraint: "
                      << location_constraint << dendl;
   }
 
   size_t pos = location_constraint.find(':');
   if (pos != string::npos) {
-    placement_rule = location_constraint.substr(pos + 1);
+    placement_rule.init(location_constraint.substr(pos + 1), s->info.storage_class);
     location_constraint = location_constraint.substr(0, pos);
+  } else {
+    placement_rule.storage_class = s->info.storage_class;
+  }
+  auto iter = s->info.x_meta_map.find("x-amz-bucket-object-lock-enabled");
+  if (iter != s->info.x_meta_map.end()) {
+    if (!boost::algorithm::iequals(iter->second, "true") && !boost::algorithm::iequals(iter->second, "false")) {
+      return -EINVAL;
+    }
+    obj_lock_enabled = boost::algorithm::iequals(iter->second, "true");
   }
-
   return 0;
 }
 
@@ -1225,27 +2338,30 @@ void RGWDeleteBucket_ObjStore_S3::send_response()
   set_req_state_err(s, r);
   dump_errno(s);
   end_header(s, this);
+}
 
-  if (s->system_request) {
-    JSONFormatter f; /* use json formatter for system requests output */
-
-    f.open_object_section("info");
-    encode_json("object_ver", objv_tracker.read_version, &f);
-    f.close_section();
-    rgw_flush_formatter_and_reset(s, &f);
+static inline void map_qs_metadata(struct req_state* s)
+{
+  /* merge S3 valid user metadata from the query-string into
+   * x_meta_map, which maps them to attributes */
+  const auto& params = const_cast<RGWHTTPArgs&>(s->info.args).get_params();
+  for (const auto& elt : params) {
+    std::string k = boost::algorithm::to_lower_copy(elt.first);
+    if (k.find("x-amz-meta-") == /* offset */ 0) {
+      rgw_add_amz_meta_header(s->info.x_meta_map, k, elt.second);
+    }
   }
 }
 
-int RGWPutObj_ObjStore_S3::get_params()
+int RGWPutObj_ObjStore_S3::get_params(optional_yield y)
 {
   if (!s->length)
     return -ERR_LENGTH_REQUIRED;
 
-  RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-  map<string, bufferlist> src_attrs;
-  size_t pos;
   int ret;
 
+  map_qs_metadata(s);
+
   RGWAccessControlPolicy_S3 s3policy(s->cct);
   ret = create_s3_policy(s, store, s3policy, s->owner);
   if (ret < 0)
@@ -1255,88 +2371,14 @@ int RGWPutObj_ObjStore_S3::get_params()
 
   if_match = s->info.env->get("HTTP_IF_MATCH");
   if_nomatch = s->info.env->get("HTTP_IF_NONE_MATCH");
-  copy_source = url_decode(s->info.env->get("HTTP_X_AMZ_COPY_SOURCE", ""));
-  copy_source_range = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE");
-
-  /* handle x-amz-copy-source */
-  boost::string_view cs_view(copy_source);
-  if (! cs_view.empty()) {
-    if (cs_view[0] == '/')
-      cs_view.remove_prefix(1);
-    copy_source_bucket_name = cs_view.to_string();
-    pos = copy_source_bucket_name.find("/");
-    if (pos == std::string::npos) {
-      ret = -EINVAL;
-      ldout(s->cct, 5) << "x-amz-copy-source bad format" << dendl;
-      return ret;
-    }
-    copy_source_object_name =
-      copy_source_bucket_name.substr(pos + 1, copy_source_bucket_name.size());
-    copy_source_bucket_name = copy_source_bucket_name.substr(0, pos);
-#define VERSION_ID_STR "?versionId="
-    pos = copy_source_object_name.find(VERSION_ID_STR);
-    if (pos == std::string::npos) {
-      copy_source_object_name = url_decode(copy_source_object_name);
-    } else {
-      copy_source_version_id =
-       copy_source_object_name.substr(pos + sizeof(VERSION_ID_STR) - 1);
-      copy_source_object_name =
-       url_decode(copy_source_object_name.substr(0, pos));
-    }
-    pos = copy_source_bucket_name.find(":");
-    if (pos == std::string::npos) {
-       copy_source_tenant_name = s->src_tenant_name;
-    } else {
-       copy_source_tenant_name = copy_source_bucket_name.substr(0, pos);
-       copy_source_bucket_name = copy_source_bucket_name.substr(pos + 1, copy_source_bucket_name.size());
-       if (copy_source_bucket_name.empty()) {
-         ret = -EINVAL;
-         ldout(s->cct, 5) << "source bucket name is empty" << dendl;
-         return ret;
-       }
-    }
-    ret = store->get_bucket_info(obj_ctx,
-                                 copy_source_tenant_name,
-                                 copy_source_bucket_name,
-                                 copy_source_bucket_info,
-                                 NULL, &src_attrs);
-    if (ret < 0) {
-      ldout(s->cct, 5) << __func__ << "(): get_bucket_info() returned ret=" << ret << dendl;
-      return ret;
-    }
-
-    /* handle x-amz-copy-source-range */
-
-    if (copy_source_range) {
-      string range = copy_source_range;
-      pos = range.find("=");
-      if (pos == std::string::npos) {
-        ret = -EINVAL;
-        ldout(s->cct, 5) << "x-amz-copy-source-range bad format" << dendl;
-        return ret;
-      }
-      range = range.substr(pos + 1);
-      pos = range.find("-");
-      if (pos == std::string::npos) {
-        ret = -EINVAL;
-        ldout(s->cct, 5) << "x-amz-copy-source-range bad format" << dendl;
-        return ret;
-      }
-      string first = range.substr(0, pos);
-      string last = range.substr(pos + 1);
-      copy_source_range_fst = strtoull(first.c_str(), NULL, 10);
-      copy_source_range_lst = strtoull(last.c_str(), NULL, 10);
-    }
-
-  } /* copy_source */
 
   /* handle object tagging */
   auto tag_str = s->info.env->get("HTTP_X_AMZ_TAGGING");
   if (tag_str){
-    obj_tags = ceph::make_unique<RGWObjTags>();
+    obj_tags = std::make_unique<RGWObjTags>();
     ret = obj_tags->set_from_string(tag_str);
     if (ret < 0){
-      ldout(s->cct,0) << "setting obj tags failed with " << ret << dendl;
+      ldpp_dout(this,0) << "setting obj tags failed with " << ret << dendl;
       if (ret == -ERR_INVALID_TAG){
         ret = -EINVAL; //s3 returns only -EINVAL for PUT requests
       }
@@ -1345,7 +2387,71 @@ int RGWPutObj_ObjStore_S3::get_params()
     }
   }
 
-  return RGWPutObj_ObjStore::get_params();
+  //handle object lock
+  auto obj_lock_mode_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_MODE");
+  auto obj_lock_date_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE");
+  auto obj_legal_hold_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_LEGAL_HOLD");
+  if (obj_lock_mode_str && obj_lock_date_str) {
+    boost::optional<ceph::real_time> date = ceph::from_iso_8601(obj_lock_date_str);
+    if (boost::none == date || ceph::real_clock::to_time_t(*date) <= ceph_clock_now()) {
+        ret = -EINVAL;
+        ldpp_dout(this,0) << "invalid x-amz-object-lock-retain-until-date value" << dendl;
+        return ret;
+    }
+    if (strcmp(obj_lock_mode_str, "GOVERNANCE") != 0 && strcmp(obj_lock_mode_str, "COMPLIANCE") != 0) {
+        ret = -EINVAL;
+        ldpp_dout(this,0) << "invalid x-amz-object-lock-mode value" << dendl;
+        return ret;
+    }
+    obj_retention = new RGWObjectRetention(obj_lock_mode_str, *date);
+  } else if ((obj_lock_mode_str && !obj_lock_date_str) || (!obj_lock_mode_str && obj_lock_date_str)) {
+    ret = -EINVAL;
+    ldpp_dout(this,0) << "need both x-amz-object-lock-mode and x-amz-object-lock-retain-until-date " << dendl;
+    return ret;
+  }
+  if (obj_legal_hold_str) {
+    if (strcmp(obj_legal_hold_str, "ON") != 0 && strcmp(obj_legal_hold_str, "OFF") != 0) {
+        ret = -EINVAL;
+        ldpp_dout(this,0) << "invalid x-amz-object-lock-legal-hold value" << dendl;
+        return ret;
+    }
+    obj_legal_hold = new RGWObjectLegalHold(obj_legal_hold_str);
+  }
+  if (!s->bucket->get_info().obj_lock_enabled() && (obj_retention || obj_legal_hold)) {
+    ldpp_dout(this, 0) << "ERROR: object retention or legal hold can't be set if bucket object lock not configured" << dendl;
+    ret = -ERR_INVALID_REQUEST;
+    return ret;
+  }
+  multipart_upload_id = s->info.args.get("uploadId");
+  multipart_part_str = s->info.args.get("partNumber");
+  if (!multipart_part_str.empty()) {
+    string err;
+    multipart_part_num = strict_strtol(multipart_part_str.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldpp_dout(s, 10) << "bad part number: " << multipart_part_str << ": " << err << dendl;
+      return -EINVAL;
+    }
+  } else if (!multipart_upload_id.empty()) {
+    ldpp_dout(s, 10) << "part number with no multipart upload id" << dendl;
+    return -EINVAL;
+  }
+
+  append = s->info.args.exists("append");
+  if (append) {
+    string pos_str = s->info.args.get("position");
+    string err;
+    long long pos_tmp = strict_strtoll(pos_str.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldpp_dout(s, 10) << "bad position: " << pos_str << ": " << err << dendl;
+      return -EINVAL;
+    } else if (pos_tmp < 0) {
+      ldpp_dout(s, 10) << "bad position: " << pos_str << ": " << "position shouldn't be negative" << dendl;
+      return -EINVAL;
+    }
+    position = uint64_t(pos_tmp);
+  }
+
+  return RGWPutObj_ObjStore::get_params(y);
 }
 
 int RGWPutObj_ObjStore_S3::get_data(bufferlist& bl)
@@ -1383,14 +2489,21 @@ void RGWPutObj_ObjStore_S3::send_response()
        s->cct->_conf->rgw_s3_success_create_obj_status);
       set_req_state_err(s, op_ret);
     }
+
+    string expires = get_s3_expiration_header(s, mtime);
+
     if (copy_source.empty()) {
       dump_errno(s);
       dump_etag(s, etag);
       dump_content_length(s, 0);
+      dump_header_if_nonempty(s, "x-amz-version-id", version_id);
+      dump_header_if_nonempty(s, "x-amz-expiration", expires);
       for (auto &it : crypt_http_responses)
         dump_header(s, it.first, it.second);
     } else {
       dump_errno(s);
+      dump_header_if_nonempty(s, "x-amz-version-id", version_id);
+      dump_header_if_nonempty(s, "x-amz-expiration", expires);
       end_header(s, this, "application/xml");
       dump_start(s);
       struct tm tmp;
@@ -1409,39 +2522,34 @@ void RGWPutObj_ObjStore_S3::send_response()
       return;
     }
   }
+  if (append) {
+    if (op_ret == 0 || op_ret == -ERR_POSITION_NOT_EQUAL_TO_LENGTH) {
+      dump_header(s, "x-rgw-next-append-position", cur_accounted_size);
+    }
+  }
   if (s->system_request && !real_clock::is_zero(mtime)) {
     dump_epoch_header(s, "Rgwx-Mtime", mtime);
   }
   end_header(s, this);
 }
 
-static inline int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map<string, bufferlist>& attrs)
-{
-  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
-  RGWRados::Object::Read read_op(&op_target);
-
-  read_op.params.attrs = &attrs;
-
-  return read_op.prepare();
-}
-
 static inline void set_attr(map<string, bufferlist>& attrs, const char* key, const std::string& value)
 {
   bufferlist bl;
-  ::encode(value,bl);
+  encode(value,bl);
   attrs.emplace(key, std::move(bl));
 }
 
 static inline void set_attr(map<string, bufferlist>& attrs, const char* key, const char* value)
 {
   bufferlist bl;
-  ::encode(value,bl);
+  encode(value,bl);
   attrs.emplace(key, std::move(bl));
 }
 
 int RGWPutObj_ObjStore_S3::get_decrypt_filter(
-    std::unique_ptr<RGWGetDataCB>* filter,
-    RGWGetDataCB* cb,
+    std::unique_ptr<RGWGetObj_Filter>* filter,
+    RGWGetObj_Filter* cb,
     map<string, bufferlist>& attrs,
     bufferlist* manifest_bl)
 {
@@ -1452,11 +2560,11 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter(
   res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses_unused);
   if (res == 0) {
     if (block_crypt != nullptr) {
-      auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt)));
+      auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt)));
       //RGWGetObj_BlockDecrypt* f = new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt));
       if (f != nullptr) {
         if (manifest_bl != nullptr) {
-          res = f->read_manifest(*manifest_bl);
+          res = f->read_manifest(this, *manifest_bl);
           if (res == 0) {
             *filter = std::move(f);
           }
@@ -1468,32 +2576,24 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter(
 }
 
 int RGWPutObj_ObjStore_S3::get_encrypt_filter(
-    std::unique_ptr<RGWPutObjDataProcessor>* filter,
-    RGWPutObjDataProcessor* cb)
+    std::unique_ptr<rgw::sal::DataProcessor> *filter,
+    rgw::sal::DataProcessor *cb)
 {
   int res = 0;
-  RGWPutObjProcessor_Multipart* multi_processor=dynamic_cast<RGWPutObjProcessor_Multipart*>(cb);
-  if (multi_processor != nullptr) {
-    RGWMPObj* mp = nullptr;
-    multi_processor->get_mp(&mp);
-    if (mp != nullptr) {
-      map<string, bufferlist> xattrs;
-      string meta_oid;
-      meta_oid = mp->get_meta();
-
-      rgw_obj obj;
-      obj.init_ns(s->bucket, meta_oid, RGW_OBJ_NS_MULTIPART);
-      obj.set_in_extra_data(true);
-      res = get_obj_attrs(store, s, obj, xattrs);
-      if (res == 0) {
-        std::unique_ptr<BlockCrypt> block_crypt;
-        /* We are adding to existing object.
-         * We use crypto mode that configured as if we were decrypting. */
-        res = rgw_s3_prepare_decrypt(s, xattrs, &block_crypt, crypt_http_responses);
-        if (res == 0 && block_crypt != nullptr)
-          *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
-              new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
-      }
+  if (!multipart_upload_id.empty()) {
+    std::unique_ptr<rgw::sal::MultipartUpload> upload =
+      s->bucket->get_multipart_upload(s->object->get_name(),
+                                 multipart_upload_id);
+    std::unique_ptr<rgw::sal::Object> obj = upload->get_meta_obj();
+    obj->set_in_extra_data(true);
+    res = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
+    if (res == 0) {
+      std::unique_ptr<BlockCrypt> block_crypt;
+      /* We are adding to existing object.
+       * We use crypto mode that configured as if we were decrypting. */
+      res = rgw_s3_prepare_decrypt(s, obj->get_attrs(), &block_crypt, crypt_http_responses);
+      if (res == 0 && block_crypt != nullptr)
+        filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
     }
     /* it is ok, to not have encryption at all */
   }
@@ -1502,15 +2602,15 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter(
     std::unique_ptr<BlockCrypt> block_crypt;
     res = rgw_s3_prepare_encrypt(s, attrs, nullptr, &block_crypt, crypt_http_responses);
     if (res == 0 && block_crypt != nullptr) {
-      *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
-          new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
+      filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
     }
   }
   return res;
 }
 
-void RGWPostObj_ObjStore_S3::rebuild_key(string& key)
+void RGWPostObj_ObjStore_S3::rebuild_key(rgw::sal::Object* obj)
 {
+  string key = obj->get_name();
   static string var = "${filename}";
   int pos = key.find(var);
   if (pos < 0)
@@ -1520,12 +2620,12 @@ void RGWPostObj_ObjStore_S3::rebuild_key(string& key)
   new_key.append(filename);
   new_key.append(key.substr(pos + var.size()));
 
-  key = new_key;
+  obj->set_key(new_key);
 }
 
 std::string RGWPostObj_ObjStore_S3::get_current_filename() const
 {
-  return s->object.name;
+  return s->object->get_name();
 }
 
 std::string RGWPostObj_ObjStore_S3::get_current_content_type() const
@@ -1533,16 +2633,18 @@ std::string RGWPostObj_ObjStore_S3::get_current_content_type() const
   return content_type;
 }
 
-int RGWPostObj_ObjStore_S3::get_params()
+int RGWPostObj_ObjStore_S3::get_params(optional_yield y)
 {
-  op_ret = RGWPostObj_ObjStore::get_params();
+  op_ret = RGWPostObj_ObjStore::get_params(y);
   if (op_ret < 0) {
     return op_ret;
   }
 
-  ldout(s->cct, 20) << "adding bucket to policy env: " << s->bucket.name
+  map_qs_metadata(s);
+
+  ldpp_dout(this, 20) << "adding bucket to policy env: " << s->bucket->get_name()
                    << dendl;
-  env.add_var("bucket", s->bucket.name);
+  env.add_var("bucket", s->bucket->get_name());
 
   bool done;
   do {
@@ -1551,17 +2653,17 @@ int RGWPostObj_ObjStore_S3::get_params()
     if (r < 0)
       return r;
 
-    if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
-      ldout(s->cct, 20) << "read part header -- part.name="
+    if (s->cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
+      ldpp_dout(this, 20) << "read part header -- part.name="
                         << part.name << dendl;
 
       for (const auto& pair : part.fields) {
-        ldout(s->cct, 20) << "field.name=" << pair.first << dendl;
-        ldout(s->cct, 20) << "field.val=" << pair.second.val << dendl;
-        ldout(s->cct, 20) << "field.params:" << dendl;
+        ldpp_dout(this, 20) << "field.name=" << pair.first << dendl;
+        ldpp_dout(this, 20) << "field.val=" << pair.second.val << dendl;
+        ldpp_dout(this, 20) << "field.params:" << dendl;
 
         for (const auto& param_pair : pair.second.params) {
-          ldout(s->cct, 20) << " " << param_pair.first
+          ldpp_dout(this, 20) << " " << param_pair.first
                             << " -> " << param_pair.second << dendl;
         }
       }
@@ -1600,16 +2702,16 @@ int RGWPostObj_ObjStore_S3::get_params()
     return -EINVAL;
   }
 
-  s->object = rgw_obj_key(object_str);
+  s->object = store->get_object(rgw_obj_key(object_str));
 
-  rebuild_key(s->object.name);
+  rebuild_key(s->object.get());
 
-  if (s->object.empty()) {
+  if (rgw::sal::Object::empty(s->object.get())) {
     err_msg = "Empty object name";
     return -EINVAL;
   }
 
-  env.add_var("key", s->object.name);
+  env.add_var("key", s->object->get_name());
 
   part_str(parts, "Content-Type", &content_type);
 
@@ -1618,6 +2720,18 @@ int RGWPostObj_ObjStore_S3::get_params()
     env.add_var("Content-Type", content_type);
   }
 
+  std::string storage_class;
+  part_str(parts, "x-amz-storage-class", &storage_class);
+
+  if (! storage_class.empty()) {
+    s->dest_placement.storage_class = storage_class;
+    if (!store->get_zone()->get_params().valid_placement(s->dest_placement)) {
+      ldpp_dout(this, 0) << "NOTICE: invalid dest placement: " << s->dest_placement.to_str() << dendl;
+      err_msg = "The storage class you specified is not valid";
+      return -EINVAL;
+    }
+  }
+
   map<string, struct post_form_part, ltstr_nocase>::iterator piter =
     parts.upper_bound(RGW_AMZ_META_PREFIX);
   for (; piter != parts.end(); ++piter) {
@@ -1654,7 +2768,7 @@ int RGWPostObj_ObjStore_S3::get_params()
     attrs[attr_name] = attr_bl;
   }
 
-  int r = get_policy();
+  int r = get_policy(y);
   if (r < 0)
     return r;
 
@@ -1675,35 +2789,35 @@ int RGWPostObj_ObjStore_S3::get_tags()
 {
   string tags_str;
   if (part_str(parts, "tagging", &tags_str)) {
-    RGWObjTagsXMLParser parser;
+    RGWXMLParser parser;
     if (!parser.init()){
-      ldout(s->cct, 0) << "Couldn't init RGWObjTags XML parser" << dendl;
+      ldpp_dout(this, 0) << "Couldn't init RGWObjTags XML parser" << dendl;
       err_msg = "Server couldn't process the request";
       return -EINVAL; // TODO: This class of errors in rgw code should be a 5XX error
     }
     if (!parser.parse(tags_str.c_str(), tags_str.size(), 1)) {
-      ldout(s->cct,0 ) << "Invalid Tagging XML" << dendl;
+      ldpp_dout(this,0 ) << "Invalid Tagging XML" << dendl;
       err_msg = "Invalid Tagging XML";
       return -EINVAL;
     }
 
-    RGWObjTagSet_S3 *obj_tags_s3;
-    RGWObjTagging_S3 *tagging;
+    RGWObjTagging_S3 tagging;
 
-    tagging = static_cast<RGWObjTagging_S3 *>(parser.find_first("Tagging"));
-    obj_tags_s3 = static_cast<RGWObjTagSet_S3 *>(tagging->find_first("TagSet"));
-    if(!obj_tags_s3){
-      return -ERR_MALFORMED_XML;
+    try {
+      RGWXMLDecoder::decode_xml("Tagging", tagging, &parser);
+    } catch (RGWXMLDecoder::err& err) {
+      ldpp_dout(this, 5) << "Malformed tagging request: " << err << dendl;
+      return -EINVAL;
     }
 
     RGWObjTags obj_tags;
-    int r = obj_tags_s3->rebuild(obj_tags);
+    int r = tagging.rebuild(obj_tags);
     if (r < 0)
       return r;
 
     bufferlist tags_bl;
     obj_tags.encode(tags_bl);
-    ldout(s->cct, 20) << "Read " << obj_tags.count() << "tags" << dendl;
+    ldpp_dout(this, 20) << "Read " << obj_tags.count() << "tags" << dendl;
     attrs[RGW_ATTR_TAGS] = tags_bl;
   }
 
@@ -1711,7 +2825,7 @@ int RGWPostObj_ObjStore_S3::get_tags()
   return 0;
 }
 
-int RGWPostObj_ObjStore_S3::get_policy()
+int RGWPostObj_ObjStore_S3::get_policy(optional_yield y)
 {
   if (part_bl(parts, "policy", &s->auth.s3_postobj_creds.encoded_policy)) {
     bool aws4_auth = false;
@@ -1720,10 +2834,10 @@ int RGWPostObj_ObjStore_S3::get_policy()
     using rgw::auth::s3::AWS4_HMAC_SHA256_STR;
     if ((part_str(parts, "x-amz-algorithm", &s->auth.s3_postobj_creds.x_amz_algorithm)) &&
         (s->auth.s3_postobj_creds.x_amz_algorithm == AWS4_HMAC_SHA256_STR)) {
-      ldout(s->cct, 0) << "Signature verification algorithm AWS v4 (AWS4-HMAC-SHA256)" << dendl;
+      ldpp_dout(this, 0) << "Signature verification algorithm AWS v4 (AWS4-HMAC-SHA256)" << dendl;
       aws4_auth = true;
     } else {
-      ldout(s->cct, 0) << "Signature verification algorithm AWS v2" << dendl;
+      ldpp_dout(this, 0) << "Signature verification algorithm AWS v2" << dendl;
     }
 
     // check that the signature matches the encoded policy
@@ -1733,7 +2847,7 @@ int RGWPostObj_ObjStore_S3::get_policy()
       /* x-amz-credential handling */
       if (!part_str(parts, "x-amz-credential",
                     &s->auth.s3_postobj_creds.x_amz_credential)) {
-        ldout(s->cct, 0) << "No S3 aws4 credential found!" << dendl;
+        ldpp_dout(this, 0) << "No S3 aws4 credential found!" << dendl;
         err_msg = "Missing aws4 credential";
         return -EINVAL;
       }
@@ -1741,7 +2855,7 @@ int RGWPostObj_ObjStore_S3::get_policy()
       /* x-amz-signature handling */
       if (!part_str(parts, "x-amz-signature",
                     &s->auth.s3_postobj_creds.signature)) {
-        ldout(s->cct, 0) << "No aws4 signature found!" << dendl;
+        ldpp_dout(this, 0) << "No aws4 signature found!" << dendl;
         err_msg = "Missing aws4 signature";
         return -EINVAL;
       }
@@ -1749,7 +2863,7 @@ int RGWPostObj_ObjStore_S3::get_policy()
       /* x-amz-date handling */
       std::string received_date_str;
       if (!part_str(parts, "x-amz-date", &received_date_str)) {
-        ldout(s->cct, 0) << "No aws4 date found!" << dendl;
+        ldpp_dout(this, 0) << "No aws4 date found!" << dendl;
         err_msg = "Missing aws4 date";
         return -EINVAL;
       }
@@ -1759,42 +2873,49 @@ int RGWPostObj_ObjStore_S3::get_policy()
       // check that the signature matches the encoded policy
       if (!part_str(parts, "AWSAccessKeyId",
                     &s->auth.s3_postobj_creds.access_key)) {
-        ldout(s->cct, 0) << "No S3 aws2 access key found!" << dendl;
+        ldpp_dout(this, 0) << "No S3 aws2 access key found!" << dendl;
         err_msg = "Missing aws2 access key";
         return -EINVAL;
       }
 
       if (!part_str(parts, "signature", &s->auth.s3_postobj_creds.signature)) {
-        ldout(s->cct, 0) << "No aws2 signature found!" << dendl;
+        ldpp_dout(this, 0) << "No aws2 signature found!" << dendl;
         err_msg = "Missing aws2 signature";
         return -EINVAL;
       }
     }
 
+    if (part_str(parts, "x-amz-security-token", &s->auth.s3_postobj_creds.x_amz_security_token)) {
+      if (s->auth.s3_postobj_creds.x_amz_security_token.size() == 0) {
+        err_msg = "Invalid token";
+        return -EINVAL;
+      }
+    }
+
     /* FIXME: this is a makeshift solution. The browser upload authentication will be
      * handled by an instance of rgw::auth::Completer spawned in Handler's authorize()
      * method. */
-    const int ret = rgw::auth::Strategy::apply(auth_registry_ptr->get_s3_post(), s);
+    const int ret = rgw::auth::Strategy::apply(this, auth_registry_ptr->get_s3_post(), s, y);
     if (ret != 0) {
       return -EACCES;
     } else {
       /* Populate the owner info. */
-      s->owner.set_id(s->user->user_id);
-      s->owner.set_name(s->user->display_name);
-      ldout(s->cct, 20) << "Successful Signature Verification!" << dendl;
+      s->owner.set_id(s->user->get_id());
+      s->owner.set_name(s->user->get_display_name());
+      ldpp_dout(this, 20) << "Successful Signature Verification!" << dendl;
     }
 
     ceph::bufferlist decoded_policy;
     try {
       decoded_policy.decode_base64(s->auth.s3_postobj_creds.encoded_policy);
     } catch (buffer::error& err) {
-      ldout(s->cct, 0) << "failed to decode_base64 policy" << dendl;
+      ldpp_dout(this, 0) << "failed to decode_base64 policy" << dendl;
       err_msg = "Could not decode policy";
       return -EINVAL;
     }
 
     decoded_policy.append('\0'); // NULL terminate
-    ldout(s->cct, 20) << "POST policy: " << decoded_policy.c_str() << dendl;
+    ldpp_dout(this, 20) << "POST policy: " << decoded_policy.c_str() << dendl;
 
 
     int r = post_policy.from_json(decoded_policy, err_msg);
@@ -1802,7 +2923,7 @@ int RGWPostObj_ObjStore_S3::get_policy()
       if (err_msg.empty()) {
        err_msg = "Failed to parse policy";
       }
-      ldout(s->cct, 0) << "failed to parse policy" << dendl;
+      ldpp_dout(this, 0) << "failed to parse policy" << dendl;
       return -EINVAL;
     }
 
@@ -1821,19 +2942,19 @@ int RGWPostObj_ObjStore_S3::get_policy()
       if (err_msg.empty()) {
        err_msg = "Policy check failed";
       }
-      ldout(s->cct, 0) << "policy check failed" << dendl;
+      ldpp_dout(this, 0) << "policy check failed" << dendl;
       return r;
     }
 
   } else {
-    ldout(s->cct, 0) << "No attached policy found!" << dendl;
+    ldpp_dout(this, 0) << "No attached policy found!" << dendl;
   }
 
   string canned_acl;
   part_str(parts, "acl", &canned_acl);
 
   RGWAccessControlPolicy_S3 s3policy(s->cct);
-  ldout(s->cct, 20) << "canned_acl=" << canned_acl << dendl;
+  ldpp_dout(this, 20) << "canned_acl=" << canned_acl << dendl;
   if (s3policy.create_canned(s->owner, s->bucket_owner, canned_acl) < 0) {
     err_msg = "Bad canned ACLs";
     return -EINVAL;
@@ -1912,7 +3033,7 @@ void RGWPostObj_ObjStore_S3::send_response()
 
     url_encode(s->bucket_tenant, tenant); /* surely overkill, but cheap */
     url_encode(s->bucket_name, bucket);
-    url_encode(s->object.name, key);
+    url_encode(s->object->get_name(), key);
     url_encode(etag_str, etag_url);
 
     if (!s->bucket_tenant.empty()) {
@@ -1975,14 +3096,23 @@ done:
     for (auto &it : crypt_http_responses)
       dump_header(s, it.first, it.second);
     s->formatter->open_object_section("PostResponse");
-    if (g_conf->rgw_dns_name.length())
-      s->formatter->dump_format("Location", "%s/%s",
-                               s->info.script_uri.c_str(),
-                               s->object.name.c_str());
-    if (!s->bucket_tenant.empty())
+    std::string base_uri = compute_domain_uri(s);
+    if (!s->bucket_tenant.empty()){
+      s->formatter->dump_format("Location", "%s/%s:%s/%s",
+                                base_uri.c_str(),
+                                url_encode(s->bucket_tenant).c_str(),
+                                url_encode(s->bucket_name).c_str(),
+                                url_encode(s->object->get_name()).c_str());
       s->formatter->dump_string("Tenant", s->bucket_tenant);
+    } else {
+      s->formatter->dump_format("Location", "%s/%s/%s",
+                                base_uri.c_str(),
+                                url_encode(s->bucket_name).c_str(),
+                                url_encode(s->object->get_name()).c_str());
+    }
     s->formatter->dump_string("Bucket", s->bucket_name);
-    s->formatter->dump_string("Key", s->object.name);
+    s->formatter->dump_string("Key", s->object->get_name());
+    s->formatter->dump_string("ETag", etag);
     s->formatter->close_section();
   }
   s->err.message = err_msg;
@@ -1999,21 +3129,19 @@ done:
 }
 
 int RGWPostObj_ObjStore_S3::get_encrypt_filter(
-    std::unique_ptr<RGWPutObjDataProcessor>* filter, RGWPutObjDataProcessor* cb)
+    std::unique_ptr<rgw::sal::DataProcessor> *filter,
+    rgw::sal::DataProcessor *cb)
 {
-  int res = 0;
   std::unique_ptr<BlockCrypt> block_crypt;
-  res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt, crypt_http_responses);
+  int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt,
+                                   crypt_http_responses);
   if (res == 0 && block_crypt != nullptr) {
-    *filter = std::unique_ptr<RGWPutObj_BlockEncrypt>(
-        new RGWPutObj_BlockEncrypt(s->cct, cb, std::move(block_crypt)));
+    filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
   }
-  else
-    *filter = nullptr;
   return res;
 }
 
-int RGWDeleteObj_ObjStore_S3::get_params()
+int RGWDeleteObj_ObjStore_S3::get_params(optional_yield y)
 {
   const char *if_unmod = s->info.env->get("HTTP_X_AMZ_DELETE_IF_UNMODIFIED_SINCE");
 
@@ -2026,12 +3154,18 @@ int RGWDeleteObj_ObjStore_S3::get_params()
     uint64_t epoch;
     uint64_t nsec;
     if (utime_t::parse_date(if_unmod_decoded, &epoch, &nsec) < 0) {
-      ldout(s->cct, 10) << "failed to parse time: " << if_unmod_decoded << dendl;
+      ldpp_dout(this, 10) << "failed to parse time: " << if_unmod_decoded << dendl;
       return -EINVAL;
     }
     unmod_since = utime_t(epoch, nsec).to_real_time();
   }
 
+  const char *bypass_gov_header = s->info.env->get("HTTP_X_AMZ_BYPASS_GOVERNANCE_RETENTION");
+  if (bypass_gov_header) {
+    std::string bypass_gov_decoded = url_decode(bypass_gov_header);
+    bypass_governance_mode = boost::algorithm::iequals(bypass_gov_decoded, "true");
+  }
+
   return 0;
 }
 
@@ -2045,9 +3179,7 @@ void RGWDeleteObj_ObjStore_S3::send_response()
 
   set_req_state_err(s, r);
   dump_errno(s);
-  if (!version_id.empty()) {
-    dump_header(s, "x-amz-version-id", version_id);
-  }
+  dump_header_if_nonempty(s, "x-amz-version-id", version_id);
   if (delete_marker) {
     dump_header(s, "x-amz-delete-marker", "true");
   }
@@ -2068,8 +3200,39 @@ int RGWCopyObj_ObjStore_S3::init_dest_policy()
   return 0;
 }
 
-int RGWCopyObj_ObjStore_S3::get_params()
+int RGWCopyObj_ObjStore_S3::get_params(optional_yield y)
 {
+  //handle object lock
+  auto obj_lock_mode_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_MODE");
+  auto obj_lock_date_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE");
+  auto obj_legal_hold_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_LEGAL_HOLD");
+  if (obj_lock_mode_str && obj_lock_date_str) {
+    boost::optional<ceph::real_time> date = ceph::from_iso_8601(obj_lock_date_str);
+    if (boost::none == date || ceph::real_clock::to_time_t(*date) <= ceph_clock_now()) {
+      s->err.message = "invalid x-amz-object-lock-retain-until-date value";
+      ldpp_dout(this,0) << s->err.message << dendl;
+      return -EINVAL;
+    }
+    if (strcmp(obj_lock_mode_str, "GOVERNANCE") != 0 && strcmp(obj_lock_mode_str, "COMPLIANCE") != 0) {
+      s->err.message = "invalid x-amz-object-lock-mode value";
+      ldpp_dout(this,0) << s->err.message << dendl;
+      return -EINVAL;
+    }
+    obj_retention = new RGWObjectRetention(obj_lock_mode_str, *date);
+  } else if (obj_lock_mode_str || obj_lock_date_str) {
+    s->err.message = "need both x-amz-object-lock-mode and x-amz-object-lock-retain-until-date ";
+    ldpp_dout(this,0) << s->err.message << dendl;
+    return -EINVAL;
+  }
+  if (obj_legal_hold_str) {
+    if (strcmp(obj_legal_hold_str, "ON") != 0 && strcmp(obj_legal_hold_str, "OFF") != 0) {
+      s->err.message = "invalid x-amz-object-lock-legal-hold value";
+      ldpp_dout(this,0) << s->err.message << dendl;
+      return -EINVAL;
+    }
+    obj_legal_hold = new RGWObjectLegalHold(obj_legal_hold_str);
+  }
+
   if_mod = s->info.env->get("HTTP_X_AMZ_COPY_IF_MODIFIED_SINCE");
   if_unmod = s->info.env->get("HTTP_X_AMZ_COPY_IF_UNMODIFIED_SINCE");
   if_match = s->info.env->get("HTTP_X_AMZ_COPY_IF_MATCH");
@@ -2077,52 +3240,52 @@ int RGWCopyObj_ObjStore_S3::get_params()
 
   src_tenant_name = s->src_tenant_name;
   src_bucket_name = s->src_bucket_name;
-  src_object = s->src_object;
-  dest_tenant_name = s->bucket.tenant;
-  dest_bucket_name = s->bucket.name;
-  dest_object = s->object.name;
+  dest_tenant_name = s->bucket->get_tenant();
+  dest_bucket_name = s->bucket->get_name();
+  dest_obj_name = s->object->get_name();
 
   if (s->system_request) {
     source_zone = s->info.args.get(RGW_SYS_PARAM_PREFIX "source-zone");
     s->info.args.get_bool(RGW_SYS_PARAM_PREFIX "copy-if-newer", &copy_if_newer, false);
-    if (!source_zone.empty()) {
-      client_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "client-id");
-      op_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "op-id");
-
-      if (client_id.empty() || op_id.empty()) {
-       ldout(s->cct, 0) <<
-         RGW_SYS_PARAM_PREFIX "client-id or "
-         RGW_SYS_PARAM_PREFIX "op-id were not provided, "
-         "required for intra-region copy"
-                        << dendl;
-       return -EINVAL;
-      }
-    }
   }
 
-  const char *md_directive = s->info.env->get("HTTP_X_AMZ_METADATA_DIRECTIVE");
-  if (md_directive) {
-    if (strcasecmp(md_directive, "COPY") == 0) {
-      attrs_mod = RGWRados::ATTRSMOD_NONE;
-    } else if (strcasecmp(md_directive, "REPLACE") == 0) {
-      attrs_mod = RGWRados::ATTRSMOD_REPLACE;
+  copy_source = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE");
+  auto tmp_md_d = s->info.env->get("HTTP_X_AMZ_METADATA_DIRECTIVE");
+  if (tmp_md_d) {
+    if (strcasecmp(tmp_md_d, "COPY") == 0) {
+      attrs_mod = rgw::sal::ATTRSMOD_NONE;
+    } else if (strcasecmp(tmp_md_d, "REPLACE") == 0) {
+      attrs_mod = rgw::sal::ATTRSMOD_REPLACE;
     } else if (!source_zone.empty()) {
-      attrs_mod = RGWRados::ATTRSMOD_NONE; // default for intra-zone_group copy
+      attrs_mod = rgw::sal::ATTRSMOD_NONE; // default for intra-zone_group copy
     } else {
-      ldout(s->cct, 0) << "invalid metadata directive" << dendl;
+      s->err.message = "Unknown metadata directive.";
+      ldpp_dout(this, 0) << s->err.message << dendl;
       return -EINVAL;
     }
+    md_directive = tmp_md_d;
   }
 
   if (source_zone.empty() &&
       (dest_tenant_name.compare(src_tenant_name) == 0) &&
       (dest_bucket_name.compare(src_bucket_name) == 0) &&
-      (dest_object.compare(src_object.name) == 0) &&
-      src_object.instance.empty() &&
-      (attrs_mod != RGWRados::ATTRSMOD_REPLACE)) {
+      (dest_obj_name.compare(s->src_object->get_name()) == 0) &&
+      s->src_object->get_instance().empty() &&
+      (attrs_mod != rgw::sal::ATTRSMOD_REPLACE)) {
+    need_to_check_storage_class = true;
+  }
+
+  return 0;
+}
+
+int RGWCopyObj_ObjStore_S3::check_storage_class(const rgw_placement_rule& src_placement)
+{
+  if (src_placement == s->dest_placement) {
     /* can only copy object into itself if replacing attrs */
-    ldout(s->cct, 0) << "can't copy object into itself if not replacing attrs"
-                    << dendl;
+    s->err.message = "This copy request is illegal because it is trying to copy "
+      "an object to itself without changing the object's metadata, "
+      "storage class, website redirect location or encryption attributes.";
+    ldpp_dout(this, 0) << s->err.message << dendl;
     return -ERR_INVALID_REQUEST;
   }
   return 0;
@@ -2135,7 +3298,10 @@ void RGWCopyObj_ObjStore_S3::send_partial_response(off_t ofs)
     set_req_state_err(s, op_ret);
     dump_errno(s);
 
-    end_header(s, this, "application/xml");
+    // Explicitly use chunked transfer encoding so that we can stream the result
+    // to the user without having to wait for the full length of it.
+    end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+    dump_start(s);
     if (op_ret == 0) {
       s->formatter->open_object_section_in_ns("CopyObjectResult", XMLNS_AWS_S3);
     }
@@ -2155,10 +3321,9 @@ void RGWCopyObj_ObjStore_S3::send_response()
     send_partial_response(0);
 
   if (op_ret == 0) {
-    dump_time(s, "LastModified", &mtime);
-    std::string etag_str = etag.to_str();
-    if (! etag_str.empty()) {
-      s->formatter->dump_string("ETag", std::move(etag_str));
+    dump_time(s, "LastModified", mtime);
+    if (!etag.empty()) {
+      s->formatter->dump_string("ETag", std::move(etag));
     }
     s->formatter->close_section();
     rgw_flush_formatter_and_reset(s, s->formatter);
@@ -2176,26 +3341,35 @@ void RGWGetACLs_ObjStore_S3::send_response()
   dump_body(s, acls);
 }
 
-int RGWPutACLs_ObjStore_S3::get_params()
+int RGWPutACLs_ObjStore_S3::get_params(optional_yield y)
 {
-  int ret =  RGWPutACLs_ObjStore::get_params();
+  int ret =  RGWPutACLs_ObjStore::get_params(y);
   if (ret >= 0) {
     const int ret_auth = do_aws4_auth_completion();
     if (ret_auth < 0) {
       return ret_auth;
     }
+  } else {
+    /* a request body is not required an S3 PutACLs request--n.b.,
+     * s->length is non-null iff a content length was parsed (the
+     * ACP or canned ACL could be in any of 3 headers, don't worry
+     * about that here) */
+    if ((ret == -ERR_LENGTH_REQUIRED) &&
+       !!(s->length)) {
+      return 0;
+    }
   }
   return ret;
 }
 
-int RGWPutACLs_ObjStore_S3::get_policy_from_state(RGWRados *store,
+int RGWPutACLs_ObjStore_S3::get_policy_from_state(rgw::sal::Store* store,
                                                  struct req_state *s,
                                                  stringstream& ss)
 {
   RGWAccessControlPolicy_S3 s3policy(s->cct);
 
   // bucket-* canned acls do not apply to bucket
-  if (s->object.empty()) {
+  if (rgw::sal::Object::empty(s->object.get())) {
     if (s->canned_acl.find("bucket") != string::npos)
       s->canned_acl.clear();
   }
@@ -2218,7 +3392,7 @@ void RGWPutACLs_ObjStore_S3::send_response()
   dump_start(s);
 }
 
-void RGWGetLC_ObjStore_S3::execute()
+void RGWGetLC_ObjStore_S3::execute(optional_yield y)
 {
   config.set_ctx(s->cct);
 
@@ -2228,11 +3402,11 @@ void RGWGetLC_ObjStore_S3::execute()
     return;
   }
 
-  bufferlist::iterator iter(&aiter->second);
+  bufferlist::const_iterator iter{&aiter->second};
   try {
       config.decode(iter);
     } catch (const buffer::error& e) {
-      ldout(s->cct, 0) << __func__ <<  "decode life cycle config failed" << dendl;
+      ldpp_dout(this, 0) << __func__ <<  "decode life cycle config failed" << dendl;
       op_ret = -EIO;
       return;
     }
@@ -2241,7 +3415,7 @@ void RGWGetLC_ObjStore_S3::execute()
 void RGWGetLC_ObjStore_S3::send_response()
 {
   if (op_ret) {
-    if (op_ret == -ENOENT) {   
+    if (op_ret == -ENOENT) {
       set_req_state_err(s, ERR_NO_SUCH_LC);
     } else {
       set_req_state_err(s, op_ret);
@@ -2254,7 +3428,7 @@ void RGWGetLC_ObjStore_S3::send_response()
   if (op_ret < 0)
     return;
 
-  config.dump_xml(s->formatter);
+  encode_xml("LifecycleConfiguration", XMLNS_AWS_S3, config, s->formatter);
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
@@ -2271,7 +3445,7 @@ void RGWDeleteLC_ObjStore_S3::send_response()
 {
   if (op_ret == 0)
       op_ret = STATUS_NO_CONTENT;
-  if (op_ret) {   
+  if (op_ret) {
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
@@ -2302,23 +3476,16 @@ void RGWGetCORS_ObjStore_S3::send_response()
   }
 }
 
-int RGWPutCORS_ObjStore_S3::get_params()
+int RGWPutCORS_ObjStore_S3::get_params(optional_yield y)
 {
-  int r;
-  char *data = nullptr;
-  int len = 0;
-  RGWCORSXMLParser_S3 parser(s->cct);
+  RGWCORSXMLParser_S3 parser(this, s->cct);
   RGWCORSConfiguration_S3 *cors_config;
 
   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-  r = rgw_rest_read_all_input(s, &data, &len, max_size, false);
-  if (r < 0) {
-    return r;
-  }
-
-  auto data_deleter = std::unique_ptr<char, decltype(free)*>{data, free};
 
-  r = do_aws4_auth_completion();
+  int r = 0;
+  bufferlist data;
+  std::tie(r, data) = read_all_input(s, max_size, false);
   if (r < 0) {
     return r;
   }
@@ -2327,24 +3494,42 @@ int RGWPutCORS_ObjStore_S3::get_params()
     return -EINVAL;
   }
 
-  if (!data || !parser.parse(data, len, 1)) {
-    return -EINVAL;
+  char* buf = data.c_str();
+  if (!buf || !parser.parse(buf, data.length(), 1)) {
+    return -ERR_MALFORMED_XML;
   }
   cors_config =
     static_cast<RGWCORSConfiguration_S3 *>(parser.find_first(
                                             "CORSConfiguration"));
   if (!cors_config) {
-    return -EINVAL;
+    return -ERR_MALFORMED_XML;
+  }
+
+#define CORS_RULES_MAX_NUM      100
+  int max_num = s->cct->_conf->rgw_cors_rules_max_num;
+  if (max_num < 0) {
+    max_num = CORS_RULES_MAX_NUM;
+  }
+  int cors_rules_num = cors_config->get_rules().size();
+  if (cors_rules_num > max_num) {
+    ldpp_dout(this, 4) << "An cors config can have up to "
+                     << max_num
+                     << " rules, request cors rules num: "
+                     << cors_rules_num << dendl;
+    op_ret = -ERR_INVALID_CORS_RULES_ERROR;
+    s->err.message = "The number of CORS rules should not exceed allowed limit of "
+                     + std::to_string(max_num) + " rules.";
+    return -ERR_INVALID_REQUEST;
   }
 
   // forward bucket cors requests to meta master zone
   if (!store->is_meta_master()) {
     /* only need to keep this data around if we're not meta master */
-    in_data.append(data, len);
+    in_data.append(data);
   }
 
-  if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
-    ldout(s->cct, 15) << "CORSConfiguration";
+  if (s->cct->_conf->subsys.should_gather<ceph_subsys_rgw, 15>()) {
+    ldpp_dout(this, 15) << "CORSConfiguration";
     cors_config->to_xml(*_dout);
     *_dout << dendl;
   }
@@ -2397,6 +3582,45 @@ void RGWOptionsCORS_ObjStore_S3::send_response()
   end_header(s, NULL);
 }
 
+void RGWPutBucketEncryption_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+void RGWGetBucketEncryption_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    if (op_ret == -ENOENT)
+      set_req_state_err(s, ERR_NO_SUCH_BUCKET_ENCRYPTION_CONFIGURATION);
+    else
+      set_req_state_err(s, op_ret);
+  }
+
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  if (!op_ret) {
+    encode_xml("ServerSideEncryptionConfiguration", bucket_encryption_conf, s->formatter);
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+}
+
+void RGWDeleteBucketEncryption_ObjStore_S3::send_response()
+{
+  if (op_ret == 0) {
+    op_ret = STATUS_NO_CONTENT;
+  }
+
+  set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s);
+}
+
 void RGWGetRequestPayment_ObjStore_S3::send_response()
 {
   dump_errno(s);
@@ -2431,7 +3655,7 @@ public:
     if (!field)
       return 0;
 
-    string& s = field->get_data();
+    auto& s = field->get_data();
 
     if (stringcasecmp(s, "Requester") == 0) {
       *requester_pays = true;
@@ -2443,37 +3667,32 @@ public:
   }
 };
 
-int RGWSetRequestPayment_ObjStore_S3::get_params()
+int RGWSetRequestPayment_ObjStore_S3::get_params(optional_yield y)
 {
-  char *data;
-  int len = 0;
   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-  int r = rgw_rest_read_all_input(s, &data, &len, max_size, false);
+
+  int r = 0;
+  std::tie(r, in_data) = read_all_input(s, max_size, false);
 
   if (r < 0) {
     return r;
   }
 
+
   RGWSetRequestPaymentParser parser;
 
   if (!parser.init()) {
-    ldout(s->cct, 0) << "ERROR: failed to initialize parser" << dendl;
-    r = -EIO;
-    goto done;
+    ldpp_dout(this, 0) << "ERROR: failed to initialize parser" << dendl;
+    return -EIO;
   }
 
-  if (!parser.parse(data, len, 1)) {
-    ldout(s->cct, 10) << "failed to parse data: " << data << dendl;
-    r = -EINVAL;
-    goto done;
+  char* buf = in_data.c_str();
+  if (!parser.parse(buf, in_data.length(), 1)) {
+    ldpp_dout(this, 10) << "failed to parse data: " << buf << dendl;
+    return -EINVAL;
   }
 
-  r = parser.get_request_payment_payer(&requester_pays);
-
-done:
-  free(data);
-
-  return r;
+  return parser.get_request_payment_payer(&requester_pays);
 }
 
 void RGWSetRequestPayment_ObjStore_S3::send_response()
@@ -2484,7 +3703,7 @@ void RGWSetRequestPayment_ObjStore_S3::send_response()
   end_header(s);
 }
 
-int RGWInitMultipart_ObjStore_S3::get_params()
+int RGWInitMultipart_ObjStore_S3::get_params(optional_yield y)
 {
   RGWAccessControlPolicy_S3 s3policy(s->cct);
   op_ret = create_s3_policy(s, store, s3policy, s->owner);
@@ -2503,6 +3722,13 @@ void RGWInitMultipart_ObjStore_S3::send_response()
   dump_errno(s);
   for (auto &it : crypt_http_responses)
      dump_header(s, it.first, it.second);
+  ceph::real_time abort_date;
+  string rule_id;
+  bool exist_multipart_abort = get_s3_multipart_abort_header(s, mtime, abort_date, rule_id);
+  if (exist_multipart_abort) {
+    dump_time_header(s, "x-amz-abort-date", abort_date);
+    dump_header_if_nonempty(s, "x-amz-abort-rule-id", rule_id);
+  }
   end_header(s, this, "application/xml");
   if (op_ret == 0) {
     dump_start(s);
@@ -2510,7 +3736,7 @@ void RGWInitMultipart_ObjStore_S3::send_response()
     if (!s->bucket_tenant.empty())
       s->formatter->dump_string("Tenant", s->bucket_tenant);
     s->formatter->dump_string("Bucket", s->bucket_name);
-    s->formatter->dump_string("Key", s->object.name);
+    s->formatter->dump_string("Key", s->object->get_name());
     s->formatter->dump_string("UploadId", upload_id);
     s->formatter->close_section();
     rgw_flush_formatter_and_reset(s, s->formatter);
@@ -2524,13 +3750,15 @@ int RGWInitMultipart_ObjStore_S3::prepare_encryption(map<string, bufferlist>& at
   return res;
 }
 
-int RGWCompleteMultipart_ObjStore_S3::get_params()
+int RGWCompleteMultipart_ObjStore_S3::get_params(optional_yield y)
 {
-  int ret = RGWCompleteMultipart_ObjStore::get_params();
+  int ret = RGWCompleteMultipart_ObjStore::get_params(y);
   if (ret < 0) {
     return ret;
   }
 
+  map_qs_metadata(s);
+
   return do_aws4_auth_completion();
 }
 
@@ -2539,6 +3767,7 @@ void RGWCompleteMultipart_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
+  dump_header_if_nonempty(s, "x-amz-version-id", version_id);
   end_header(s, this, "application/xml");
   if (op_ret == 0) {
     dump_start(s);
@@ -2549,18 +3778,18 @@ void RGWCompleteMultipart_ObjStore_S3::send_response()
          base_uri.c_str(),
          s->bucket_tenant.c_str(),
          s->bucket_name.c_str(),
-         s->object.name.c_str()
+         s->object->get_name().c_str()
           );
       s->formatter->dump_string("Tenant", s->bucket_tenant);
     } else {
       s->formatter->dump_format("Location", "%s/%s/%s",
          base_uri.c_str(),
          s->bucket_name.c_str(),
-         s->object.name.c_str()
+         s->object->get_name().c_str()
           );
     }
     s->formatter->dump_string("Bucket", s->bucket_name);
-    s->formatter->dump_string("Key", s->object.name);
+    s->formatter->dump_string("Key", s->object->get_name());
     s->formatter->dump_string("ETag", etag);
     s->formatter->close_section();
     rgw_flush_formatter_and_reset(s, s->formatter);
@@ -2583,26 +3812,28 @@ void RGWListMultipart_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
 
   if (op_ret == 0) {
     dump_start(s);
     s->formatter->open_object_section_in_ns("ListPartsResult", XMLNS_AWS_S3);
-    map<uint32_t, RGWUploadPartInfo>::iterator iter;
-    map<uint32_t, RGWUploadPartInfo>::reverse_iterator test_iter;
+    map<uint32_t, std::unique_ptr<rgw::sal::MultipartPart>>::iterator iter;
+    map<uint32_t, std::unique_ptr<rgw::sal::MultipartPart>>::reverse_iterator test_iter;
     int cur_max = 0;
 
-    iter = parts.begin();
-    test_iter = parts.rbegin();
-    if (test_iter != parts.rend()) {
+    iter = upload->get_parts().begin();
+    test_iter = upload->get_parts().rbegin();
+    if (test_iter != upload->get_parts().rend()) {
       cur_max = test_iter->first;
     }
     if (!s->bucket_tenant.empty())
       s->formatter->dump_string("Tenant", s->bucket_tenant);
     s->formatter->dump_string("Bucket", s->bucket_name);
-    s->formatter->dump_string("Key", s->object.name);
+    s->formatter->dump_string("Key", s->object->get_name());
     s->formatter->dump_string("UploadId", upload_id);
-    s->formatter->dump_string("StorageClass", "STANDARD");
+    s->formatter->dump_string("StorageClass", placement->get_storage_class());
     s->formatter->dump_int("PartNumberMarker", marker);
     s->formatter->dump_int("NextPartNumberMarker", cur_max);
     s->formatter->dump_int("MaxParts", max_parts);
@@ -2611,16 +3842,16 @@ void RGWListMultipart_ObjStore_S3::send_response()
     ACLOwner& owner = policy.get_owner();
     dump_owner(s, owner.get_id(), owner.get_display_name());
 
-    for (; iter != parts.end(); ++iter) {
-      RGWUploadPartInfo& info = iter->second;
+    for (; iter != upload->get_parts().end(); ++iter) {
+      rgw::sal::MultipartPart* part = iter->second.get();
 
       s->formatter->open_object_section("Part");
 
-      dump_time(s, "LastModified", &info.modified);
+      dump_time(s, "LastModified", part->get_mtime());
 
-      s->formatter->dump_unsigned("PartNumber", info.num);
-      s->formatter->dump_format("ETag", "\"%s\"", info.etag.c_str());
-      s->formatter->dump_unsigned("Size", info.accounted_size);
+      s->formatter->dump_unsigned("PartNumber", part->get_num());
+      s->formatter->dump_format("ETag", "\"%s\"", part->get_etag().c_str());
+      s->formatter->dump_unsigned("Size", part->get_size());
       s->formatter->close_section();
     }
     s->formatter->close_section();
@@ -2634,7 +3865,9 @@ void RGWListBucketMultiparts_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   dump_errno(s);
 
-  end_header(s, this, "application/xml");
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
   if (op_ret < 0)
     return;
@@ -2644,43 +3877,46 @@ void RGWListBucketMultiparts_ObjStore_S3::send_response()
     s->formatter->dump_string("Tenant", s->bucket_tenant);
   s->formatter->dump_string("Bucket", s->bucket_name);
   if (!prefix.empty())
-    s->formatter->dump_string("ListMultipartUploadsResult.Prefix", prefix);
-  string& key_marker = marker.get_key();
-  if (!key_marker.empty())
-    s->formatter->dump_string("KeyMarker", key_marker);
-  string& upload_id_marker = marker.get_upload_id();
-  if (!upload_id_marker.empty())
-    s->formatter->dump_string("UploadIdMarker", upload_id_marker);
-  string next_key = next_marker.mp.get_key();
-  if (!next_key.empty())
-    s->formatter->dump_string("NextKeyMarker", next_key);
-  string next_upload_id = next_marker.mp.get_upload_id();
-  if (!next_upload_id.empty())
-    s->formatter->dump_string("NextUploadIdMarker", next_upload_id);
+    s->formatter->dump_string("Prefix", prefix);
+  if (!marker_key.empty())
+    s->formatter->dump_string("KeyMarker", marker_key);
+  if (!marker_upload_id.empty())
+    s->formatter->dump_string("UploadIdMarker", marker_upload_id);
+  if (!next_marker_key.empty())
+    s->formatter->dump_string("NextKeyMarker", next_marker_key);
+  if (!next_marker_upload_id.empty())
+    s->formatter->dump_string("NextUploadIdMarker", next_marker_upload_id);
   s->formatter->dump_int("MaxUploads", max_uploads);
   if (!delimiter.empty())
     s->formatter->dump_string("Delimiter", delimiter);
   s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false"));
 
   if (op_ret >= 0) {
-    vector<RGWMultipartUploadEntry>::iterator iter;
+    vector<std::unique_ptr<rgw::sal::MultipartUpload>>::iterator iter;
     for (iter = uploads.begin(); iter != uploads.end(); ++iter) {
-      RGWMPObj& mp = iter->mp;
+      rgw::sal::MultipartUpload* upload = iter->get();
       s->formatter->open_array_section("Upload");
-      s->formatter->dump_string("Key", mp.get_key());
-      s->formatter->dump_string("UploadId", mp.get_upload_id());
-      dump_owner(s, s->user->user_id, s->user->display_name, "Initiator");
-      dump_owner(s, s->user->user_id, s->user->display_name);
+      if (encode_url) {
+        s->formatter->dump_string("Key", url_encode(upload->get_key(), false));
+      } else {
+        s->formatter->dump_string("Key", upload->get_key());
+      }
+      s->formatter->dump_string("UploadId", upload->get_upload_id());
+      const ACLOwner& owner = upload->get_owner();
+      dump_owner(s, owner.get_id(), owner.get_display_name(), "Initiator");
+      dump_owner(s, owner.get_id(), owner.get_display_name()); // Owner
       s->formatter->dump_string("StorageClass", "STANDARD");
-      dump_time(s, "Initiated", &iter->obj.meta.mtime);
+      dump_time(s, "Initiated", upload->get_mtime());
       s->formatter->close_section();
     }
     if (!common_prefixes.empty()) {
       s->formatter->open_array_section("CommonPrefixes");
-      map<string, bool>::iterator pref_iter;
-      for (pref_iter = common_prefixes.begin();
-          pref_iter != common_prefixes.end(); ++pref_iter) {
-       s->formatter->dump_string("CommonPrefixes.Prefix", pref_iter->first);
+      for (const auto& kv : common_prefixes) {
+        if (encode_url) {
+          s->formatter->dump_string("Prefix", url_encode(kv.first, false));
+        } else {
+          s->formatter->dump_string("Prefix", kv.first);
+        }
       }
       s->formatter->close_section();
     }
@@ -2689,13 +3925,19 @@ void RGWListBucketMultiparts_ObjStore_S3::send_response()
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-int RGWDeleteMultiObj_ObjStore_S3::get_params()
+int RGWDeleteMultiObj_ObjStore_S3::get_params(optional_yield y)
 {
-  int ret = RGWDeleteMultiObj_ObjStore::get_params();
+  int ret = RGWDeleteMultiObj_ObjStore::get_params(y);
   if (ret < 0) {
     return ret;
   }
 
+  const char *bypass_gov_header = s->info.env->get("HTTP_X_AMZ_BYPASS_GOVERNANCE_RETENTION");
+  if (bypass_gov_header) {
+    std::string bypass_gov_decoded = url_decode(bypass_gov_header);
+    bypass_governance_mode = boost::algorithm::iequals(bypass_gov_decoded, "true");
+  }
+
   return do_aws4_auth_completion();
 }
 
@@ -2717,7 +3959,9 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response()
   }
 
   dump_start(s);
-  end_header(s, this, "application/xml");
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
   s->formatter->open_object_section_in_ns("DeleteResult", XMLNS_AWS_S3);
 
   rgw_flush_formatter(s, s->formatter);
@@ -2728,7 +3972,7 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
                                                          const string& marker_version_id, int ret)
 {
   if (!key.empty()) {
-    if (op_ret == 0 && !quiet) {
+    if (ret == 0 && !quiet) {
       s->formatter->open_object_section("Deleted");
       s->formatter->dump_string("Key", key.name);
       if (!key.instance.empty()) {
@@ -2739,18 +3983,18 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
        s->formatter->dump_string("DeleteMarkerVersionId", marker_version_id);
       }
       s->formatter->close_section();
-    } else if (op_ret < 0) {
+    } else if (ret < 0) {
       struct rgw_http_error r;
       int err_no;
 
       s->formatter->open_object_section("Error");
 
-      err_no = -op_ret;
+      err_no = -ret;
       rgw_get_errno_s3(&r, err_no);
 
       s->formatter->dump_string("Key", key.name);
       s->formatter->dump_string("VersionId", key.instance);
-      s->formatter->dump_int("Code", r.http_ret);
+      s->formatter->dump_string("Code", r.s3_code);
       s->formatter->dump_string("Message", r.s3_code);
       s->formatter->close_section();
     }
@@ -2780,30 +4024,17 @@ void RGWGetObjLayout_ObjStore_S3::send_response()
   }
 
   f.open_object_section("result");
-  ::encode_json("head", head_obj, &f);
-  ::encode_json("manifest", *manifest, &f);
-  f.open_array_section("data_location");
-  for (auto miter = manifest->obj_begin(); miter != manifest->obj_end(); ++miter) {
-    f.open_object_section("obj");
-    rgw_raw_obj raw_loc = miter.get_location().get_raw_obj(store);
-    ::encode_json("ofs", miter.get_ofs(), &f);
-    ::encode_json("loc", raw_loc, &f);
-    ::encode_json("loc_ofs", miter.location_ofs(), &f);
-    ::encode_json("loc_size", miter.get_stripe_size(), &f);
-    f.close_section();
-    rgw_flush_formatter(s, &f);
-  }
-  f.close_section();
+  s->object->dump_obj_layout(this, s->yield, &f, s->obj_ctx);
   f.close_section();
   rgw_flush_formatter(s, &f);
 }
 
-int RGWConfigBucketMetaSearch_ObjStore_S3::get_params()
+int RGWConfigBucketMetaSearch_ObjStore_S3::get_params(optional_yield y)
 {
   auto iter = s->info.x_meta_map.find("x-amz-meta-search");
   if (iter == s->info.x_meta_map.end()) {
     s->err.message = "X-Rgw-Meta-Search header not provided";
-    ldout(s->cct, 5) << s->err.message << dendl;
+    ldpp_dout(this, 5) << s->err.message << dendl;
     return -EINVAL;
   }
 
@@ -2816,12 +4047,12 @@ int RGWConfigBucketMetaSearch_ObjStore_S3::get_params()
 
     if (args.empty()) {
       s->err.message = "invalid empty expression";
-      ldout(s->cct, 5) << s->err.message << dendl;
+      ldpp_dout(this, 5) << s->err.message << dendl;
       return -EINVAL;
     }
     if (args.size() > 2) {
       s->err.message = string("invalid expression: ") + expression;
-      ldout(s->cct, 5) << s->err.message << dendl;
+      ldpp_dout(this, 5) << s->err.message << dendl;
       return -EINVAL;
     }
 
@@ -2833,7 +4064,7 @@ int RGWConfigBucketMetaSearch_ObjStore_S3::get_params()
 
     if (!boost::algorithm::starts_with(key, RGW_AMZ_META_PREFIX)) {
       s->err.message = string("invalid expression, key must start with '" RGW_AMZ_META_PREFIX "' : ") + expression;
-      ldout(s->cct, 5) << s->err.message << dendl;
+      ldpp_dout(this, 5) << s->err.message << dendl;
       return -EINVAL;
     }
 
@@ -2849,7 +4080,7 @@ int RGWConfigBucketMetaSearch_ObjStore_S3::get_params()
       entity_type = ESEntityTypeMap::ES_ENTITY_DATE;
     } else {
       s->err.message = string("invalid entity type: ") + val;
-      ldout(s->cct, 5) << s->err.message << dendl;
+      ldpp_dout(this, 5) << s->err.message << dendl;
       return -EINVAL;
     }
 
@@ -2876,7 +4107,7 @@ void RGWGetBucketMetaSearch_ObjStore_S3::send_response()
 
   Formatter *f = s->formatter;
   f->open_array_section("GetBucketMetaSearchResult");
-  for (auto& e : s->bucket_info.mdsearch_config) {
+  for (auto& e : s->bucket->get_info().mdsearch_config) {
     f->open_object_section("Entry");
     string k = string("x-amz-meta-") + e.first;
     f->dump_string("Key", k.c_str());
@@ -2906,6 +4137,136 @@ void RGWDelBucketMetaSearch_ObjStore_S3::send_response()
   end_header(s, this);
 }
 
+void RGWPutBucketObjectLock_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+void RGWGetBucketObjectLock_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  if (op_ret) {
+    return;
+  }
+  encode_xml("ObjectLockConfiguration", s->bucket->get_info().obj_lock, s->formatter);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+
+int RGWPutObjRetention_ObjStore_S3::get_params(optional_yield y)
+{
+  const char *bypass_gov_header = s->info.env->get("HTTP_X_AMZ_BYPASS_GOVERNANCE_RETENTION");
+  if (bypass_gov_header) {
+    std::string bypass_gov_decoded = url_decode(bypass_gov_header);
+    bypass_governance_mode = boost::algorithm::iequals(bypass_gov_decoded, "true");
+  }
+
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  std::tie(op_ret, data) = read_all_input(s, max_size, false);
+  return op_ret;
+}
+
+void RGWPutObjRetention_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+void RGWGetObjRetention_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  if (op_ret) {
+    return;
+  }
+  encode_xml("Retention", obj_retention, s->formatter);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void RGWPutObjLegalHold_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+void RGWGetObjLegalHold_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  if (op_ret) {
+    return;
+  }
+  encode_xml("LegalHold", obj_legal_hold, s->formatter);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void RGWGetBucketPolicyStatus_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  s->formatter->open_object_section_in_ns("PolicyStatus", XMLNS_AWS_S3);
+  // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETPolicyStatus.html
+  // mentions TRUE and FALSE, but boto/aws official clients seem to want lower
+  // case which is returned by AWS as well; so let's be bug to bug compatible
+  // with the API
+  s->formatter->dump_bool("IsPublic", isPublic);
+  s->formatter->close_section();
+  rgw_flush_formatter_and_reset(s, s->formatter);
+
+}
+
+void RGWPutBucketPublicAccessBlock_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s);
+}
+
+void RGWGetBucketPublicAccessBlock_ObjStore_S3::send_response()
+{
+  if (op_ret) {
+    set_req_state_err(s, op_ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+
+  access_conf.dump_xml(s->formatter);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
 
 RGWOp *RGWHandler_REST_Service_S3::op_get()
 {
@@ -2914,44 +4275,71 @@ RGWOp *RGWHandler_REST_Service_S3::op_get()
   } else {
     return new RGWListBuckets_ObjStore_S3;
   }
-}
+}
+
+RGWOp *RGWHandler_REST_Service_S3::op_head()
+{
+  return new RGWListBuckets_ObjStore_S3;
+}
+
+RGWOp *RGWHandler_REST_Service_S3::op_post()
+{
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+
+  int ret;
+  bufferlist data;
+  std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
+  if (ret < 0) {
+      return nullptr;
+  }
+
+  const auto post_body = data.to_str();
+
+  if (isSTSEnabled) {
+    RGWHandler_REST_STS sts_handler(auth_registry, post_body);
+    sts_handler.init(store, s, s->cio);
+    auto op = sts_handler.get_op();
+    if (op) {
+      return op;
+    }
+  }
+
+  if (isIAMEnabled) {
+    RGWHandler_REST_IAM iam_handler(auth_registry, post_body);
+    iam_handler.init(store, s, s->cio);
+    auto op = iam_handler.get_op();
+    if (op) {
+      return op;
+    }
+  }
+
+  if (isPSEnabled) {
+    RGWHandler_REST_PSTopic_AWS topic_handler(auth_registry, post_body);
+    topic_handler.init(store, s, s->cio);
+    auto op = topic_handler.get_op();
+    if (op) {
+      return op;
+    }
+  }
 
-RGWOp *RGWHandler_REST_Service_S3::op_head()
-{
-  return new RGWListBuckets_ObjStore_S3;
+  return nullptr;
 }
 
-RGWOp *RGWHandler_REST_Service_S3::op_post()
-{
-  if (s->info.args.exists("Action")) {
-    string action = s->info.args.get("Action");
-    if (action.compare("CreateRole") == 0)
-      return new RGWCreateRole;
-    if (action.compare("DeleteRole") == 0)
-      return new RGWDeleteRole;
-    if (action.compare("GetRole") == 0)
-      return new RGWGetRole;
-    if (action.compare("UpdateAssumeRolePolicy") == 0)
-      return new RGWModifyRole;
-    if (action.compare("ListRoles") == 0)
-      return new RGWListRoles;
-    if (action.compare("PutRolePolicy") == 0)
-      return new RGWPutRolePolicy;
-    if (action.compare("GetRolePolicy") == 0)
-      return new RGWGetRolePolicy;
-    if (action.compare("ListRolePolicies") == 0)
-      return new RGWListRolePolicies;
-    if (action.compare("DeleteRolePolicy") == 0)
-      return new RGWDeleteRolePolicy;
-  }
-  return NULL;
-}
-
-RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
+RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data) const
 {
   // Non-website mode
   if (get_data) {
-    return new RGWListBucket_ObjStore_S3;
+    int list_type = 1;
+    s->info.args.get_int("list-type", &list_type, 1);
+    switch (list_type) {
+      case 1:
+        return new RGWListBucket_ObjStore_S3;
+      case 2:
+        return new RGWListBucket_ObjStore_S3v2;
+      default:
+        ldpp_dout(s, 5) << __func__ << ": unsupported list-type " << list_type << dendl;
+        return new RGWListBucket_ObjStore_S3;
+    }
   } else {
     return new RGWStatBucket_ObjStore_S3;
   }
@@ -2959,6 +4347,9 @@ RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
 
 RGWOp *RGWHandler_REST_Bucket_S3::op_get()
 {
+  if (s->info.args.sub_resource_exists("encryption"))
+    return nullptr;
+
   if (s->info.args.sub_resource_exists("logging"))
     return new RGWGetBucketLogging_ObjStore_S3;
 
@@ -2991,6 +4382,20 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get()
     return new RGWGetLC_ObjStore_S3;
   } else if(is_policy_op()) {
     return new RGWGetBucketPolicy;
+  } else if (is_tagging_op()) {
+    return new RGWGetBucketTags_ObjStore_S3;
+  } else if (is_object_lock_op()) {
+    return new RGWGetBucketObjectLock_ObjStore_S3;
+  } else if (is_notification_op()) {
+    return RGWHandler_REST_PSNotifs_S3::create_get_op();
+  } else if (is_replication_op()) {
+    return new RGWGetBucketReplication_ObjStore_S3;
+  } else if (is_policy_status_op()) {
+    return new RGWGetBucketPolicyStatus_ObjStore_S3;
+  } else if (is_block_public_access_op()) {
+    return new RGWGetBucketPublicAccessBlock_ObjStore_S3;
+  } else if (is_bucket_encryption_op()) {
+    return new RGWGetBucketEncryption_ObjStore_S3;
   }
   return get_obj_op(true);
 }
@@ -3007,8 +4412,9 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_head()
 
 RGWOp *RGWHandler_REST_Bucket_S3::op_put()
 {
-  if (s->info.args.sub_resource_exists("logging"))
-    return NULL;
+  if (s->info.args.sub_resource_exists("logging") ||
+      s->info.args.sub_resource_exists("encryption"))
+    return nullptr;
   if (s->info.args.sub_resource_exists("versioning"))
     return new RGWSetBucketVersioning_ObjStore_S3;
   if (s->info.args.sub_resource_exists("website")) {
@@ -3017,7 +4423,9 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put()
     }
     return new RGWSetBucketWebsite_ObjStore_S3;
   }
-  if (is_acl_op()) {
+  if (is_tagging_op()) {
+    return new RGWPutBucketTags_ObjStore_S3;
+  } else if (is_acl_op()) {
     return new RGWPutACLs_ObjStore_S3;
   } else if (is_cors_op()) {
     return new RGWPutCORS_ObjStore_S3;
@@ -3027,18 +4435,48 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put()
     return new RGWPutLC_ObjStore_S3;
   } else if(is_policy_op()) {
     return new RGWPutBucketPolicy;
+  } else if (is_object_lock_op()) {
+    return new RGWPutBucketObjectLock_ObjStore_S3;
+  } else if (is_notification_op()) {
+    return RGWHandler_REST_PSNotifs_S3::create_put_op();
+  } else if (is_replication_op()) {
+    auto sync_policy_handler = static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->get_sync_policy_handler(nullopt);
+    if (!sync_policy_handler ||
+        sync_policy_handler->is_legacy_config()) {
+      return nullptr;
+    }
+
+    return new RGWPutBucketReplication_ObjStore_S3;
+  } else if (is_block_public_access_op()) {
+    return new RGWPutBucketPublicAccessBlock_ObjStore_S3;
+  } else if (is_bucket_encryption_op()) {
+    return new RGWPutBucketEncryption_ObjStore_S3;
   }
   return new RGWCreateBucket_ObjStore_S3;
 }
 
 RGWOp *RGWHandler_REST_Bucket_S3::op_delete()
 {
-  if (is_cors_op()) {
+  if (s->info.args.sub_resource_exists("logging") ||
+      s->info.args.sub_resource_exists("encryption"))
+    return nullptr;
+
+  if (is_tagging_op()) {
+    return new RGWDeleteBucketTags_ObjStore_S3;
+  } else if (is_cors_op()) {
     return new RGWDeleteCORS_ObjStore_S3;
   } else if(is_lc_op()) {
     return new RGWDeleteLC_ObjStore_S3;
   } else if(is_policy_op()) {
     return new RGWDeleteBucketPolicy;
+  } else if (is_notification_op()) {
+    return RGWHandler_REST_PSNotifs_S3::create_delete_op();
+  } else if (is_replication_op()) {
+    return new RGWDeleteBucketReplication_ObjStore_S3;
+  } else if (is_block_public_access_op()) {
+    return new RGWDeleteBucketPublicAccessBlock;
+  } else if (is_bucket_encryption_op()) {
+    return new RGWDeleteBucketEncryption_ObjStore_S3;
   }
 
   if (s->info.args.sub_resource_exists("website")) {
@@ -3075,9 +4513,6 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_options()
 
 RGWOp *RGWHandler_REST_Obj_S3::get_obj_op(bool get_data)
 {
-  if (is_acl_op()) {
-    return new RGWGetACLs_ObjStore_S3;
-  }
   RGWGetObj_ObjStore_S3 *get_obj_op = new RGWGetObj_ObjStore_S3;
   get_obj_op->set_get_data(get_data);
   return get_obj_op;
@@ -3093,6 +4528,10 @@ RGWOp *RGWHandler_REST_Obj_S3::op_get()
     return new RGWGetObjLayout_ObjStore_S3;
   } else if (is_tagging_op()) {
     return new RGWGetObjTags_ObjStore_S3;
+  } else if (is_obj_retention_op()) {
+    return new RGWGetObjRetention_ObjStore_S3;
+  } else if (is_obj_legal_hold_op()) {
+    return new RGWGetObjLegalHold_ObjStore_S3;
   }
   return get_obj_op(true);
 }
@@ -3113,6 +4552,10 @@ RGWOp *RGWHandler_REST_Obj_S3::op_put()
     return new RGWPutACLs_ObjStore_S3;
   } else if (is_tagging_op()) {
     return new RGWPutObjTags_ObjStore_S3;
+  } else if (is_obj_retention_op()) {
+    return new RGWPutObjRetention_ObjStore_S3;
+  } else if (is_obj_legal_hold_op()) {
+    return new RGWPutObjLegalHold_ObjStore_S3;
   }
 
   if (s->init_state.src_bucket.empty())
@@ -3141,6 +4584,9 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post()
 
   if (s->info.args.exists("uploads"))
     return new RGWInitMultipart_ObjStore_S3;
+  
+  if (is_select_op())
+    return rgw::s3select::create_s3select_op();
 
   return new RGWPostObj_ObjStore_S3;
 }
@@ -3150,9 +4596,10 @@ RGWOp *RGWHandler_REST_Obj_S3::op_options()
   return new RGWOptionsCORS_ObjStore_S3;
 }
 
-int RGWHandler_REST_S3::init_from_header(struct req_state* s,
-                                       int default_formatter,
-                                       bool configurable_format)
+int RGWHandler_REST_S3::init_from_header(rgw::sal::Store* store,
+                                        struct req_state* s,
+                                        int default_formatter,
+                                        bool configurable_format)
 {
   string req;
   string first;
@@ -3167,7 +4614,7 @@ int RGWHandler_REST_S3::init_from_header(struct req_state* s,
   }
 
   s->info.args.set(p);
-  s->info.args.parse();
+  s->info.args.parse(s);
 
   /* must be called after the args parsing */
   int ret = allocate_formatter(s, default_formatter, configurable_format);
@@ -3203,54 +4650,108 @@ int RGWHandler_REST_S3::init_from_header(struct req_state* s,
   if (s->init_state.url_bucket.empty()) {
     // Save bucket to tide us over until token is parsed.
     s->init_state.url_bucket = first;
+    string encoded_obj_str;
     if (pos >= 0) {
-      string encoded_obj_str = req.substr(pos+1);
-      s->object = rgw_obj_key(encoded_obj_str, s->info.args.get("versionId"));
+      encoded_obj_str = req.substr(pos+1);
+    }
+
+    /* dang: s->bucket is never set here, since it's created with permissions.
+     * These calls will always create an object with no bucket. */
+    if (!encoded_obj_str.empty()) {
+      if (s->bucket) {
+       s->object = s->bucket->get_object(rgw_obj_key(encoded_obj_str, s->info.args.get("versionId")));
+      } else {
+       s->object = store->get_object(rgw_obj_key(encoded_obj_str, s->info.args.get("versionId")));
+      }
     }
   } else {
-    s->object = rgw_obj_key(req_name, s->info.args.get("versionId"));
+    if (s->bucket) {
+      s->object = s->bucket->get_object(rgw_obj_key(req_name, s->info.args.get("versionId")));
+    } else {
+      s->object = store->get_object(rgw_obj_key(req_name, s->info.args.get("versionId")));
+    }
   }
   return 0;
 }
 
-int RGWHandler_REST_S3::postauth_init()
+static int verify_mfa(rgw::sal::Store* store, RGWUserInfo *user,
+                     const string& mfa_str, bool *verified, const DoutPrefixProvider *dpp, optional_yield y)
+{
+  vector<string> params;
+  get_str_vec(mfa_str, " ", params);
+
+  if (params.size() != 2) {
+    ldpp_dout(dpp, 5) << "NOTICE: invalid mfa string provided: " << mfa_str << dendl;
+    return -EINVAL;
+  }
+
+  string& serial = params[0];
+  string& pin = params[1];
+
+  auto i = user->mfa_ids.find(serial);
+  if (i == user->mfa_ids.end()) {
+    ldpp_dout(dpp, 5) << "NOTICE: user does not have mfa device with serial=" << serial << dendl;
+    return -EACCES;
+  }
+
+  int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->cls->mfa.check_mfa(dpp, user->user_id, serial, pin, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 20) << "NOTICE: failed to check MFA, serial=" << serial << dendl;
+    return -EACCES;
+  }
+
+  *verified = true;
+
+  return 0;
+}
+
+int RGWHandler_REST_S3::postauth_init(optional_yield y)
 {
   struct req_init_state *t = &s->init_state;
-  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
 
-  rgw_parse_url_bucket(t->url_bucket, s->user->user_id.tenant,
+  rgw_parse_url_bucket(t->url_bucket, s->user->get_tenant(),
                      s->bucket_tenant, s->bucket_name);
 
-  dout(10) << "s->object=" << (!s->object.empty() ? s->object : rgw_obj_key("<NULL>"))
+  if (s->auth.identity->get_identity_type() == TYPE_ROLE) {
+    s->bucket_tenant = s->auth.identity->get_role_tenant();
+  }
+
+  ldpp_dout(s, 10) << "s->object=" << s->object
            << " s->bucket=" << rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name) << dendl;
 
   int ret;
   ret = rgw_validate_tenant_name(s->bucket_tenant);
   if (ret)
     return ret;
-  if (!s->bucket_name.empty()) {
-    ret = valid_s3_bucket_name(s->bucket_name, relaxed_names);
-    if (ret)
-      return ret;
-    ret = validate_object_name(s->object.name);
+  if (!s->bucket_name.empty() && !rgw::sal::Object::empty(s->object.get())) {
+    ret = validate_object_name(s->object->get_name());
     if (ret)
       return ret;
   }
 
   if (!t->src_bucket.empty()) {
-    rgw_parse_url_bucket(t->src_bucket, s->user->user_id.tenant,
+    string auth_tenant;
+    if (s->auth.identity->get_identity_type() == TYPE_ROLE) {
+      auth_tenant = s->auth.identity->get_role_tenant();
+    } else {
+      auth_tenant = s->user->get_tenant();
+    }
+    rgw_parse_url_bucket(t->src_bucket, auth_tenant,
                        s->src_tenant_name, s->src_bucket_name);
     ret = rgw_validate_tenant_name(s->src_tenant_name);
     if (ret)
       return ret;
-    ret = valid_s3_bucket_name(s->src_bucket_name, relaxed_names);
-    if (ret)
-      return ret;
   }
+
+  const char *mfa = s->info.env->get("HTTP_X_AMZ_MFA");
+  if (mfa) {
+    ret = verify_mfa(store, &s->user->get_info(), string(mfa), &s->mfa_verified, s, y);
+  }
+
   return 0;
 }
 
-int RGWHandler_REST_S3::init(RGWRados *store, struct req_state *s,
+int RGWHandler_REST_S3::init(rgw::sal::Store* store, struct req_state *s,
                              rgw::io::BasicClient *cio)
 {
   int ret;
@@ -3260,12 +4761,8 @@ int RGWHandler_REST_S3::init(RGWRados *store, struct req_state *s,
   ret = rgw_validate_tenant_name(s->bucket_tenant);
   if (ret)
     return ret;
-  bool relaxed_names = s->cct->_conf->rgw_relaxed_s3_bucket_names;
   if (!s->bucket_name.empty()) {
-    ret = valid_s3_bucket_name(s->bucket_name, relaxed_names);
-    if (ret)
-      return ret;
-    ret = validate_object_name(s->object.name);
+    ret = validate_object_name(s->object->get_name());
     if (ret)
       return ret;
   }
@@ -3280,27 +4777,43 @@ int RGWHandler_REST_S3::init(RGWRados *store, struct req_state *s,
   if (copy_source &&
       (! s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE")) &&
       (! s->info.args.exists("uploadId"))) {
+    rgw_obj_key key;
 
-    ret = RGWCopyObj::parse_copy_location(url_decode(copy_source),
+    ret = RGWCopyObj::parse_copy_location(copy_source,
                                           s->init_state.src_bucket,
-                                          s->src_object);
+                                          key,
+                                          s);
     if (!ret) {
-      ldout(s->cct, 0) << "failed to parse copy location" << dendl;
+      ldpp_dout(s, 0) << "failed to parse copy location" << dendl;
       return -EINVAL; // XXX why not -ERR_INVALID_BUCKET_NAME or -ERR_BAD_URL?
     }
+    s->src_object = store->get_object(key);
+  }
+
+  const char *sc = s->info.env->get("HTTP_X_AMZ_STORAGE_CLASS");
+  if (sc) {
+    s->info.storage_class = sc;
   }
 
   return RGWHandler_REST::init(store, s, cio);
 }
 
+int RGWHandler_REST_S3::authorize(const DoutPrefixProvider *dpp, optional_yield y)
+{
+  if (s->info.args.exists("Action") && s->info.args.get("Action") == "AssumeRoleWithWebIdentity") {
+    return RGW_Auth_STS::authorize(dpp, store, auth_registry, s, y);
+  }
+  return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
+}
+
 enum class AwsVersion {
-  UNKOWN,
+  UNKNOWN,
   V2,
   V4
 };
 
 enum class AwsRoute {
-  UNKOWN,
+  UNKNOWN,
   QUERY_STRING,
   HEADERS
 };
@@ -3310,8 +4823,8 @@ discover_aws_flavour(const req_info& info)
 {
   using rgw::auth::s3::AWS4_HMAC_SHA256_STR;
 
-  AwsVersion version = AwsVersion::UNKOWN;
-  AwsRoute route = AwsRoute::UNKOWN;
+  AwsVersion version = AwsVersion::UNKNOWN;
+  AwsRoute route = AwsRoute::UNKNOWN;
 
   const char* http_auth = info.env->get("HTTP_AUTHORIZATION");
   if (http_auth && http_auth[0]) {
@@ -3329,7 +4842,7 @@ discover_aws_flavour(const req_info& info)
   } else {
     route = AwsRoute::QUERY_STRING;
 
-    if (info.args.get("X-Amz-Algorithm") == AWS4_HMAC_SHA256_STR) {
+    if (info.args.get("x-amz-algorithm") == AWS4_HMAC_SHA256_STR) {
       /* AWS v4 */
       version = AwsVersion::V4;
     } else if (!info.args.get("AWSAccessKeyId").empty()) {
@@ -3341,58 +4854,53 @@ discover_aws_flavour(const req_info& info)
   return std::make_pair(version, route);
 }
 
-static void init_anon_user(struct req_state *s)
-{
-  rgw_get_anon_user(*(s->user));
-  s->perm_mask = RGW_PERM_FULL_CONTROL;
-}
-
 /*
  * verify that a signed request comes from the keyholder
  * by checking the signature against our locally-computed version
  *
  * it tries AWS v4 before AWS v2
  */
-int RGW_Auth_S3::authorize(RGWRados* const store,
+int RGW_Auth_S3::authorize(const DoutPrefixProvider *dpp,
+                           rgw::sal::Store* const store,
                            const rgw::auth::StrategyRegistry& auth_registry,
-                           struct req_state* const s)
+                           struct req_state* const s, optional_yield y)
 {
 
   /* neither keystone and rados enabled; warn and exit! */
   if (!store->ctx()->_conf->rgw_s3_auth_use_rados &&
       !store->ctx()->_conf->rgw_s3_auth_use_keystone &&
       !store->ctx()->_conf->rgw_s3_auth_use_ldap) {
-    dout(0) << "WARNING: no authorization backend enabled! Users will never authenticate." << dendl;
+    ldpp_dout(dpp, 0) << "WARNING: no authorization backend enabled! Users will never authenticate." << dendl;
     return -EPERM;
   }
 
-  const auto ret = rgw::auth::Strategy::apply(auth_registry.get_s3_main(), s);
+  const auto ret = rgw::auth::Strategy::apply(dpp, auth_registry.get_s3_main(), s, y);
   if (ret == 0) {
     /* Populate the owner info. */
-    s->owner.set_id(s->user->user_id);
-    s->owner.set_name(s->user->display_name);
+    s->owner.set_id(s->user->get_id());
+    s->owner.set_name(s->user->get_display_name());
   }
   return ret;
 }
 
-int RGWHandler_Auth_S3::init(RGWRados *store, struct req_state *state,
+int RGWHandler_Auth_S3::init(rgw::sal::Store* store, struct req_state *state,
                              rgw::io::BasicClient *cio)
 {
-  int ret = RGWHandler_REST_S3::init_from_header(state, RGW_FORMAT_JSON,
-                                                    true);
+  int ret = RGWHandler_REST_S3::init_from_header(store, state, RGW_FORMAT_JSON, true);
   if (ret < 0)
     return ret;
 
   return RGWHandler_REST::init(store, state, cio);
 }
 
-RGWHandler_REST* RGWRESTMgr_S3::get_handler(struct req_state* const s,
+RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::Store* store,
+                                           struct req_state* const s,
                                             const rgw::auth::StrategyRegistry& auth_registry,
                                             const std::string& frontend_prefix)
 {
   bool is_s3website = enable_s3website && (s->prot_flags & RGW_REST_WEBSITE);
   int ret =
-    RGWHandler_REST_S3::init_from_header(s,
+    RGWHandler_REST_S3::init_from_header(store, s,
                                        is_s3website ? RGW_FORMAT_HTML :
                                        RGW_FORMAT_XML, true);
   if (ret < 0)
@@ -3403,43 +4911,48 @@ RGWHandler_REST* RGWRESTMgr_S3::get_handler(struct req_state* const s,
   if (is_s3website) {
     if (s->init_state.url_bucket.empty()) {
       handler = new RGWHandler_REST_Service_S3Website(auth_registry);
-    } else if (s->object.empty()) {
+    } else if (rgw::sal::Object::empty(s->object.get())) {
       handler = new RGWHandler_REST_Bucket_S3Website(auth_registry);
     } else {
       handler = new RGWHandler_REST_Obj_S3Website(auth_registry);
     }
   } else {
     if (s->init_state.url_bucket.empty()) {
-      handler = new RGWHandler_REST_Service_S3(auth_registry);
-    } else if (s->object.empty()) {
-      handler = new RGWHandler_REST_Bucket_S3(auth_registry);
-    } else {
+      handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam, enable_pubsub);
+    } else if (!rgw::sal::Object::empty(s->object.get())) {
       handler = new RGWHandler_REST_Obj_S3(auth_registry);
+    } else if (s->info.args.exist_obj_excl_sub_resource()) {
+      return NULL;
+    } else {
+      handler = new RGWHandler_REST_Bucket_S3(auth_registry, enable_pubsub);
     }
   }
 
-  ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name()
+  ldpp_dout(s, 20) << __func__ << " handler=" << typeid(*handler).name()
                    << dendl;
   return handler;
 }
 
 bool RGWHandler_REST_S3Website::web_dir() const {
-  std::string subdir_name = url_decode(s->object.name);
+  std::string subdir_name;
+  if (!rgw::sal::Object::empty(s->object.get())) {
+    subdir_name = url_decode(s->object->get_name());
+  }
 
   if (subdir_name.empty()) {
     return false;
-  } else if (subdir_name.back() == '/') {
+  } else if (subdir_name.back() == '/' && subdir_name.size() > 1) {
     subdir_name.pop_back();
   }
 
-  rgw_obj obj(s->bucket, subdir_name);
+  std::unique_ptr<rgw::sal::Object> obj = s->bucket->get_object(rgw_obj_key(subdir_name));
 
   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-  obj_ctx.obj.set_atomic(obj);
-  obj_ctx.obj.set_prefetch_data(obj);
+  obj->set_atomic(&obj_ctx);
+  obj->set_prefetch_data(&obj_ctx);
 
   RGWObjState* state = nullptr;
-  if (store->get_obj_state(&obj_ctx, s->bucket_info, obj, &state, false) < 0) {
+  if (obj->get_obj_state(s, &obj_ctx, &state, s->yield) < 0) {
     return false;
   }
   if (! state->exists) {
@@ -3448,46 +4961,63 @@ bool RGWHandler_REST_S3Website::web_dir() const {
   return state->exists;
 }
 
-int RGWHandler_REST_S3Website::retarget(RGWOp* op, RGWOp** new_op) {
+int RGWHandler_REST_S3Website::init(rgw::sal::Store* store, req_state *s,
+                                    rgw::io::BasicClient* cio)
+{
+  // save the original object name before retarget() replaces it with the
+  // result of get_effective_key(). the error_handler() needs the original
+  // object name for redirect handling
+  if (!rgw::sal::Object::empty(s->object.get())) {
+    original_object_name = s->object->get_name();
+  } else {
+    original_object_name = "";
+  }
+
+  return RGWHandler_REST_S3::init(store, s, cio);
+}
+
+int RGWHandler_REST_S3Website::retarget(RGWOp* op, RGWOp** new_op, optional_yield y) {
   *new_op = op;
-  ldout(s->cct, 10) << __func__ << "Starting retarget" << dendl;
+  ldpp_dout(s, 10) << __func__ << " Starting retarget" << dendl;
 
   if (!(s->prot_flags & RGW_REST_WEBSITE))
     return 0;
 
-  RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-  int ret = store->get_bucket_info(obj_ctx, s->bucket_tenant,
-                                 s->bucket_name, s->bucket_info, NULL,
-                                 &s->bucket_attrs);
-  if (ret < 0) {
-      // TODO-FUTURE: if the bucket does not exist, maybe expose it here?
-      return -ERR_NO_SUCH_BUCKET;
-  }
-  if (!s->bucket_info.has_website) {
+  if (!s->bucket->get_info().has_website) {
       // TODO-FUTURE: if the bucket has no WebsiteConfig, expose it here
       return -ERR_NO_SUCH_WEBSITE_CONFIGURATION;
   }
 
   rgw_obj_key new_obj;
-  s->bucket_info.website_conf.get_effective_key(s->object.name, &new_obj.name, web_dir());
-  ldout(s->cct, 10) << "retarget get_effective_key " << s->object << " -> "
+  string key_name;
+  if (!rgw::sal::Object::empty(s->object.get())) {
+    key_name = s->object->get_name();
+  }
+  bool get_res = s->bucket->get_info().website_conf.get_effective_key(key_name, &new_obj.name, web_dir());
+  if (!get_res) {
+    s->err.message = "The IndexDocument Suffix is not configurated or not well formed!";
+    ldpp_dout(s, 5) << s->err.message << dendl;
+    return -EINVAL;
+  }
+
+  ldpp_dout(s, 10) << "retarget get_effective_key " << s->object << " -> "
                    << new_obj << dendl;
 
   RGWBWRoutingRule rrule;
   bool should_redirect =
-    s->bucket_info.website_conf.should_redirect(new_obj.name, 0, &rrule);
+    s->bucket->get_info().website_conf.should_redirect(new_obj.name, 0, &rrule);
 
   if (should_redirect) {
     const string& hostname = s->info.env->get("HTTP_HOST", "");
     const string& protocol =
       (s->info.env->get("SERVER_PORT_SECURE") ? "https" : "http");
     int redirect_code = 0;
-    rrule.apply_rule(protocol, hostname, s->object.name, &s->redirect,
+    rrule.apply_rule(protocol, hostname, key_name, &s->redirect,
                    &redirect_code);
     // APply a custom HTTP response code
     if (redirect_code > 0)
       s->err.http_ret = redirect_code; // Apply a custom HTTP response code
-    ldout(s->cct, 10) << "retarget redirect code=" << redirect_code
+    ldpp_dout(s, 10) << "retarget redirect code=" << redirect_code
                      << " proto+host:" << protocol << "://" << hostname
                      << " -> " << s->redirect << dendl;
     return -ERR_WEBSITE_REDIRECT;
@@ -3496,9 +5026,10 @@ int RGWHandler_REST_S3Website::retarget(RGWOp* op, RGWOp** new_op) {
   /*
    * FIXME: if s->object != new_obj, drop op and create a new op to handle
    * operation. Or remove this comment if it's not applicable anymore
+   * dang: This could be problematic, since we're not actually replacing op, but
+   * we are replacing s->object.  Something might have a pointer to it.
    */
-
-  s->object = new_obj;
+  s->object = s->bucket->get_object(new_obj);
 
   return 0;
 }
@@ -3513,7 +5044,7 @@ RGWOp* RGWHandler_REST_S3Website::op_head()
   return get_obj_op(false);
 }
 
-int RGWHandler_REST_S3Website::serve_errordoc(int http_ret, const string& errordoc_key) {
+int RGWHandler_REST_S3Website::serve_errordoc(const DoutPrefixProvider *dpp, int http_ret, const string& errordoc_key, optional_yield y) {
   int ret = 0;
   s->formatter->reset(); /* Try to throw it all away */
 
@@ -3527,17 +5058,20 @@ int RGWHandler_REST_S3Website::serve_errordoc(int http_ret, const string& errord
   getop->if_unmod = NULL;
   getop->if_match = NULL;
   getop->if_nomatch = NULL;
-  s->object = errordoc_key;
+  /* This is okay.  It's an error, so nothing will run after this, and it can be
+   * called by abort_early(), which can be called before s->object or s->bucket
+   * are set up. Note, it won't have bucket. */
+  s->object = store->get_object(errordoc_key);
 
-  ret = init_permissions(getop.get());
+  ret = init_permissions(getop.get(), y);
   if (ret < 0) {
-    ldout(s->cct, 20) << "serve_errordoc failed, init_permissions ret=" << ret << dendl;
+    ldpp_dout(s, 20) << "serve_errordoc failed, init_permissions ret=" << ret << dendl;
     return -1; // Trigger double error handler
   }
 
-  ret = read_permissions(getop.get());
+  ret = read_permissions(getop.get(), y);
   if (ret < 0) {
-    ldout(s->cct, 20) << "serve_errordoc failed, read_permissions ret=" << ret << dendl;
+    ldpp_dout(s, 20) << "serve_errordoc failed, read_permissions ret=" << ret << dendl;
     return -1; // Trigger double error handler
   }
 
@@ -3545,27 +5079,27 @@ int RGWHandler_REST_S3Website::serve_errordoc(int http_ret, const string& errord
      getop->set_custom_http_response(http_ret);
   }
 
-  ret = getop->init_processing();
+  ret = getop->init_processing(y);
   if (ret < 0) {
-    ldout(s->cct, 20) << "serve_errordoc failed, init_processing ret=" << ret << dendl;
+    ldpp_dout(s, 20) << "serve_errordoc failed, init_processing ret=" << ret << dendl;
     return -1; // Trigger double error handler
   }
 
   ret = getop->verify_op_mask();
   if (ret < 0) {
-    ldout(s->cct, 20) << "serve_errordoc failed, verify_op_mask ret=" << ret << dendl;
+    ldpp_dout(s, 20) << "serve_errordoc failed, verify_op_mask ret=" << ret << dendl;
     return -1; // Trigger double error handler
   }
 
-  ret = getop->verify_permission();
+  ret = getop->verify_permission(y);
   if (ret < 0) {
-    ldout(s->cct, 20) << "serve_errordoc failed, verify_permission ret=" << ret << dendl;
+    ldpp_dout(s, 20) << "serve_errordoc failed, verify_permission ret=" << ret << dendl;
     return -1; // Trigger double error handler
   }
 
   ret = getop->verify_params();
   if (ret < 0) {
-    ldout(s->cct, 20) << "serve_errordoc failed, verify_params ret=" << ret << dendl;
+    ldpp_dout(s, 20) << "serve_errordoc failed, verify_params ret=" << ret << dendl;
     return -1; // Trigger double error handler
   }
 
@@ -3578,14 +5112,14 @@ int RGWHandler_REST_S3Website::serve_errordoc(int http_ret, const string& errord
    *   x-amz-error-message: The specified key does not exist.
    *   x-amz-error-detail-Key: foo
    */
-  getop->execute();
+  getop->execute(y);
   getop->complete();
   return 0;
-
 }
 
 int RGWHandler_REST_S3Website::error_handler(int err_no,
-                                           string* error_content) {
+                                            string* error_content,
+                                            optional_yield y) {
   int new_err_no = -1;
   rgw_http_errors::const_iterator r = rgw_http_s3_errors.find(err_no > 0 ? err_no : -err_no);
   int http_error_code = -1;
@@ -3593,41 +5127,45 @@ int RGWHandler_REST_S3Website::error_handler(int err_no,
   if (r != rgw_http_s3_errors.end()) {
     http_error_code = r->second.first;
   }
-  ldout(s->cct, 10) << "RGWHandler_REST_S3Website::error_handler err_no=" << err_no << " http_ret=" << http_error_code << dendl;
+  ldpp_dout(s, 10) << "RGWHandler_REST_S3Website::error_handler err_no=" << err_no << " http_ret=" << http_error_code << dendl;
 
   RGWBWRoutingRule rrule;
-  bool should_redirect =
-    s->bucket_info.website_conf.should_redirect(s->object.name, http_error_code,
-                                               &rrule);
+  bool have_bucket = !rgw::sal::Bucket::empty(s->bucket.get());
+  bool should_redirect = false;
+  if (have_bucket) {
+    should_redirect =
+      s->bucket->get_info().website_conf.should_redirect(original_object_name,
+                                                        http_error_code, &rrule);
+  }
 
   if (should_redirect) {
     const string& hostname = s->info.env->get("HTTP_HOST", "");
     const string& protocol =
       (s->info.env->get("SERVER_PORT_SECURE") ? "https" : "http");
     int redirect_code = 0;
-    rrule.apply_rule(protocol, hostname, s->object.name, &s->redirect,
-                   &redirect_code);
+    rrule.apply_rule(protocol, hostname, original_object_name,
+                     &s->redirect, &redirect_code);
     // Apply a custom HTTP response code
     if (redirect_code > 0)
       s->err.http_ret = redirect_code; // Apply a custom HTTP response code
-    ldout(s->cct, 10) << "error handler redirect code=" << redirect_code
+    ldpp_dout(s, 10) << "error handler redirect code=" << redirect_code
                      << " proto+host:" << protocol << "://" << hostname
                      << " -> " << s->redirect << dendl;
     return -ERR_WEBSITE_REDIRECT;
   } else if (err_no == -ERR_WEBSITE_REDIRECT) {
     // Do nothing here, this redirect will be handled in abort_early's ERR_WEBSITE_REDIRECT block
     // Do NOT fire the ErrorDoc handler
-  } else if (!s->bucket_info.website_conf.error_doc.empty()) {
+  } else if (have_bucket && !s->bucket->get_info().website_conf.error_doc.empty()) {
     /* This serves an entire page!
        On success, it will return zero, and no further content should be sent to the socket
        On failure, we need the double-error handler
      */
-    new_err_no = RGWHandler_REST_S3Website::serve_errordoc(http_error_code, s->bucket_info.website_conf.error_doc);
-    if (new_err_no && new_err_no != -1) {
+    new_err_no = RGWHandler_REST_S3Website::serve_errordoc(s, http_error_code, s->bucket->get_info().website_conf.error_doc, y);
+    if (new_err_no != -1) {
       err_no = new_err_no;
     }
   } else {
-    ldout(s->cct, 20) << "No special error handling today!" << dendl;
+    ldpp_dout(s, 20) << "No special error handling today!" << dendl;
   }
 
   return err_no;
@@ -3667,37 +5205,7 @@ RGWOp* RGWHandler_REST_Service_S3Website::get_obj_op(bool get_data)
 }
 
 
-namespace rgw {
-namespace auth {
-namespace s3 {
-
-bool AWSGeneralAbstractor::is_time_skew_ok(const utime_t& header_time,
-                                           const bool qsr) const
-{
-  /* Check for time skew first. */
-  const time_t req_sec = header_time.sec();
-  time_t now;
-  time(&now);
-
-  if ((req_sec < now - RGW_AUTH_GRACE_MINS * 60 ||
-       req_sec > now + RGW_AUTH_GRACE_MINS * 60) && !qsr) {
-    ldout(cct, 10) << "req_sec=" << req_sec << " now=" << now
-                   << "; now - RGW_AUTH_GRACE_MINS="
-                   << now - RGW_AUTH_GRACE_MINS * 60
-                   << "; now + RGW_AUTH_GRACE_MINS="
-                   << now + RGW_AUTH_GRACE_MINS * 60
-                   << dendl;
-
-    ldout(cct, 0)  << "NOTICE: request time skew too big now="
-                   << utime_t(now, 0)
-                   << " req_time=" << header_time
-                   << dendl;
-    return false;
-  } else {
-    return true;
-  }
-}
-
+namespace rgw::auth::s3 {
 
 static rgw::auth::Completer::cmplptr_t
 null_completer_factory(const boost::optional<std::string>& secret_key)
@@ -3726,32 +5234,170 @@ AWSGeneralAbstractor::get_auth_data(const req_state* const s) const
 boost::optional<std::string>
 AWSGeneralAbstractor::get_v4_canonical_headers(
   const req_info& info,
-  const boost::string_view& signedheaders,
+  const std::string_view& signedheaders,
   const bool using_qs) const
 {
   return rgw::auth::s3::get_v4_canonical_headers(info, signedheaders,
                                                  using_qs, false);
 }
 
+AWSSignerV4::prepare_result_t
+AWSSignerV4::prepare(const DoutPrefixProvider *dpp,
+                     const std::string& access_key_id,
+                     const string& region,
+                     const string& service,
+                     const req_info& info,
+                     const bufferlist *opt_content,
+                     bool s3_op)
+{
+  std::string signed_hdrs;
+
+  ceph::real_time timestamp = ceph::real_clock::now();
+
+  map<string, string> extra_headers;
+
+  std::string date = ceph::to_iso_8601_no_separators(timestamp, ceph::iso_8601_format::YMDhms);
+
+  std::string credential_scope = gen_v4_scope(timestamp, region, service);
+
+  extra_headers["x-amz-date"] = date;
+
+  string content_hash;
+
+  if (opt_content) {
+    content_hash = rgw::auth::s3::calc_v4_payload_hash(opt_content->to_str());
+    extra_headers["x-amz-content-sha256"] = content_hash;
+
+  }
+
+  /* craft canonical headers */
+  std::string canonical_headers = \
+    gen_v4_canonical_headers(info, extra_headers, &signed_hdrs);
+
+  using sanitize = rgw::crypt_sanitize::log_content;
+  ldpp_dout(dpp, 10) << "canonical headers format = "
+                     << sanitize{canonical_headers} << dendl;
+
+  bool is_non_s3_op = !s3_op;
+
+  const char* exp_payload_hash = nullptr;
+  string payload_hash;
+  if (is_non_s3_op) {
+    //For non s3 ops, we need to calculate the payload hash
+    payload_hash = info.args.get("PayloadHash");
+    exp_payload_hash = payload_hash.c_str();
+  } else {
+    /* Get the expected hash. */
+    if (content_hash.empty()) {
+      exp_payload_hash = rgw::auth::s3::get_v4_exp_payload_hash(info);
+    } else {
+      exp_payload_hash = content_hash.c_str();
+    }
+  }
+
+  /* Craft canonical URI. Using std::move later so let it be non-const. */
+  auto canonical_uri = rgw::auth::s3::gen_v4_canonical_uri(info);
+
+
+  /* Craft canonical query string. std::moving later so non-const here. */
+  auto canonical_qs = rgw::auth::s3::gen_v4_canonical_qs(info);
+
+  auto cct = dpp->get_cct();
+
+  /* Craft canonical request. */
+  auto canonical_req_hash = \
+    rgw::auth::s3::get_v4_canon_req_hash(cct,
+                                         info.method,
+                                         std::move(canonical_uri),
+                                         std::move(canonical_qs),
+                                         std::move(canonical_headers),
+                                         signed_hdrs,
+                                         exp_payload_hash,
+                                         dpp);
+
+  auto string_to_sign = \
+    rgw::auth::s3::get_v4_string_to_sign(cct,
+                                         AWS4_HMAC_SHA256_STR,
+                                         date,
+                                         credential_scope,
+                                         std::move(canonical_req_hash),
+                                         dpp);
+
+  const auto sig_factory = gen_v4_signature;
+
+  /* Requests authenticated with the Query Parameters are treated as unsigned.
+   * From "Authenticating Requests: Using Query Parameters (AWS Signature
+   * Version 4)":
+   *
+   *   You don't include a payload hash in the Canonical Request, because
+   *   when you create a presigned URL, you don't know the payload content
+   *   because the URL is used to upload an arbitrary payload. Instead, you
+   *   use a constant string UNSIGNED-PAYLOAD.
+   *
+   * This means we have absolutely no business in spawning completer. Both
+   * aws4_auth_needs_complete and aws4_auth_streaming_mode are set to false
+   * by default. We don't need to change that. */
+  return {
+    access_key_id,
+    date,
+    credential_scope,
+    std::move(signed_hdrs),
+    std::move(string_to_sign),
+    std::move(extra_headers),
+    sig_factory,
+  };
+}
+
+AWSSignerV4::signature_headers_t
+gen_v4_signature(const DoutPrefixProvider *dpp,
+                 const std::string_view& secret_key,
+                 const AWSSignerV4::prepare_result_t& sig_info)
+{
+  auto signature = rgw::auth::s3::get_v4_signature(sig_info.scope,
+                                                   dpp->get_cct(),
+                                                   secret_key,
+                                                   sig_info.string_to_sign,
+                                                   dpp);
+  AWSSignerV4::signature_headers_t result;
+
+  for (auto& entry : sig_info.extra_headers) {
+    result[entry.first] = entry.second;
+  }
+  auto& payload_hash = result["x-amz-content-sha256"];
+  if (payload_hash.empty()) {
+    payload_hash = AWS4_UNSIGNED_PAYLOAD_HASH;
+  }
+  string auth_header = string("AWS4-HMAC-SHA256 Credential=").append(sig_info.access_key_id) + "/";
+  auth_header.append(sig_info.scope + ",SignedHeaders=")
+             .append(sig_info.signed_headers + ",Signature=")
+             .append(signature);
+  result["Authorization"] = auth_header;
+
+  return result;
+}
+
+
 AWSEngine::VersionAbstractor::auth_data_t
 AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
-                                       /* FIXME: const. */
-                                       bool using_qs) const
-{
-  boost::string_view access_key_id;
-  boost::string_view signed_hdrs;
-
-  boost::string_view date;
-  boost::string_view credential_scope;
-  boost::string_view client_signature;
-
-  int ret = rgw::auth::s3::parse_credentials(s->info,
-                                             access_key_id,
-                                             credential_scope,
-                                             signed_hdrs,
-                                             client_signature,
-                                             date,
-                                             using_qs);
+                                       const bool using_qs) const
+{
+  std::string_view access_key_id;
+  std::string_view signed_hdrs;
+
+  std::string_view date;
+  std::string_view credential_scope;
+  std::string_view client_signature;
+  std::string_view session_token;
+
+  int ret = rgw::auth::s3::parse_v4_credentials(s->info,
+                                               access_key_id,
+                                               credential_scope,
+                                               signed_hdrs,
+                                               client_signature,
+                                               date,
+                                               session_token,
+                                               using_qs,
+                                                s);
   if (ret < 0) {
     throw ret;
   }
@@ -3760,14 +5406,25 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
   boost::optional<std::string> canonical_headers = \
     get_v4_canonical_headers(s->info, signed_hdrs, using_qs);
   if (canonical_headers) {
-    ldout(s->cct, 10) << "canonical headers format = " << *canonical_headers
-                      << dendl;
+    using sanitize = rgw::crypt_sanitize::log_content;
+    ldpp_dout(s, 10) << "canonical headers format = "
+                      << sanitize{*canonical_headers} << dendl;
   } else {
     throw -EPERM;
   }
 
-  /* Get the expected hash. */
-  auto exp_payload_hash = rgw::auth::s3::get_v4_exp_payload_hash(s->info);
+  bool is_non_s3_op = rgw::auth::s3::is_non_s3_op(s->op_type);
+
+  const char* exp_payload_hash = nullptr;
+  string payload_hash;
+  if (is_non_s3_op) {
+    //For non s3 ops, we need to calculate the payload hash
+    payload_hash = s->info.args.get("PayloadHash");
+    exp_payload_hash = payload_hash.c_str();
+  } else {
+    /* Get the expected hash. */
+    exp_payload_hash = rgw::auth::s3::get_v4_exp_payload_hash(s->info);
+  }
 
   /* Craft canonical URI. Using std::move later so let it be non-const. */
   auto canonical_uri = rgw::auth::s3::get_v4_canonical_uri(s->info);
@@ -3783,20 +5440,23 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
                                          std::move(canonical_qs),
                                          std::move(*canonical_headers),
                                          signed_hdrs,
-                                         exp_payload_hash);
+                                         exp_payload_hash,
+                                         s);
 
   auto string_to_sign = \
     rgw::auth::s3::get_v4_string_to_sign(s->cct,
                                          AWS4_HMAC_SHA256_STR,
                                          date,
                                          credential_scope,
-                                         std::move(canonical_req_hash));
+                                         std::move(canonical_req_hash),
+                                         s);
 
   const auto sig_factory = std::bind(rgw::auth::s3::get_v4_signature,
                                      credential_scope,
                                      std::placeholders::_1,
                                      std::placeholders::_2,
-                                     std::placeholders::_3);
+                                     std::placeholders::_3,
+                                     s);
 
   /* Requests authenticated with the Query Parameters are treated as unsigned.
    * From "Authenticating Requests: Using Query Parameters (AWS Signature
@@ -3810,10 +5470,11 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
    * This means we have absolutely no business in spawning completer. Both
    * aws4_auth_needs_complete and aws4_auth_streaming_mode are set to false
    * by default. We don't need to change that. */
-  if (is_v4_payload_unsigned(exp_payload_hash) || is_v4_payload_empty(s)) {
+  if (is_v4_payload_unsigned(exp_payload_hash) || is_v4_payload_empty(s) || is_non_s3_op) {
     return {
       access_key_id,
       client_signature,
+      session_token,
       std::move(string_to_sign),
       sig_factory,
       null_completer_factory
@@ -3826,7 +5487,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
      *   Version 4 requests. It provides a hash of the request payload. If
      *   there is no payload, you must provide the hash of an empty string. */
     if (!is_v4_payload_streamed(exp_payload_hash)) {
-      ldout(s->cct, 10) << "delaying v4 auth" << dendl;
+      ldpp_dout(s, 10) << "delaying v4 auth" << dendl;
 
       /* payload in a single chunk */
       switch (s->op_type)
@@ -3835,18 +5496,39 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
         case RGW_OP_PUT_OBJ:
         case RGW_OP_PUT_ACLS:
         case RGW_OP_PUT_CORS:
+        case RGW_OP_PUT_BUCKET_ENCRYPTION:
+        case RGW_OP_GET_BUCKET_ENCRYPTION:
+        case RGW_OP_DELETE_BUCKET_ENCRYPTION:
         case RGW_OP_INIT_MULTIPART: // in case that Init Multipart uses CHUNK encoding
         case RGW_OP_COMPLETE_MULTIPART:
         case RGW_OP_SET_BUCKET_VERSIONING:
         case RGW_OP_DELETE_MULTI_OBJ:
         case RGW_OP_ADMIN_SET_METADATA:
+        case RGW_OP_SYNC_DATALOG_NOTIFY:
+        case RGW_OP_SYNC_MDLOG_NOTIFY:
+        case RGW_OP_PERIOD_POST:
         case RGW_OP_SET_BUCKET_WEBSITE:
         case RGW_OP_PUT_BUCKET_POLICY:
         case RGW_OP_PUT_OBJ_TAGGING:
+       case RGW_OP_PUT_BUCKET_TAGGING:
+       case RGW_OP_PUT_BUCKET_REPLICATION:
         case RGW_OP_PUT_LC:
+        case RGW_OP_SET_REQUEST_PAYMENT:
+        case RGW_OP_PUBSUB_NOTIF_CREATE:
+        case RGW_OP_PUBSUB_NOTIF_DELETE:
+        case RGW_OP_PUBSUB_NOTIF_LIST:
+        case RGW_OP_PUT_BUCKET_OBJ_LOCK:
+        case RGW_OP_PUT_OBJ_RETENTION:
+        case RGW_OP_PUT_OBJ_LEGAL_HOLD:
+        case RGW_STS_GET_SESSION_TOKEN:
+        case RGW_STS_ASSUME_ROLE:
+        case RGW_OP_PUT_BUCKET_PUBLIC_ACCESS_BLOCK:
+        case RGW_OP_GET_BUCKET_PUBLIC_ACCESS_BLOCK:
+        case RGW_OP_DELETE_BUCKET_PUBLIC_ACCESS_BLOCK:
+       case RGW_OP_GET_OBJ://s3select its post-method(payload contain the query) , the request is get-object
           break;
         default:
-          dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
+          ldpp_dout(s, 10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
           throw -ERR_NOT_IMPLEMENTED;
       }
 
@@ -3856,6 +5538,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
       return {
         access_key_id,
         client_signature,
+        session_token,
         std::move(string_to_sign),
         sig_factory,
         cmpl_factory
@@ -3864,7 +5547,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
       /* IMHO "streamed" doesn't fit too good here. I would prefer to call
        * it "chunked" but let's be coherent with Amazon's terminology. */
 
-      dout(10) << "body content detected in multiple chunks" << dendl;
+      ldpp_dout(s, 10) << "body content detected in multiple chunks" << dendl;
 
       /* payload in multiple chunks */
 
@@ -3873,11 +5556,11 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
         case RGW_OP_PUT_OBJ:
           break;
         default:
-          dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED (streaming mode)" << dendl;
+          ldpp_dout(s, 10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED (streaming mode)" << dendl;
           throw -ERR_NOT_IMPLEMENTED;
       }
 
-      dout(10) << "aws4 seed signature ok... delaying v4 auth" << dendl;
+      ldpp_dout(s, 10) << "aws4 seed signature ok... delaying v4 auth" << dendl;
 
       /* In the case of streamed payload client sets the x-amz-content-sha256
        * to "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" but uses "UNSIGNED-PAYLOAD"
@@ -3898,6 +5581,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
       return {
         access_key_id,
         client_signature,
+        session_token,
         std::move(string_to_sign),
         sig_factory,
         cmpl_factory
@@ -3910,7 +5594,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
 boost::optional<std::string>
 AWSGeneralBoto2Abstractor::get_v4_canonical_headers(
   const req_info& info,
-  const boost::string_view& signedheaders,
+  const std::string_view& signedheaders,
   const bool using_qs) const
 {
   return rgw::auth::s3::get_v4_canonical_headers(info, signedheaders,
@@ -3921,8 +5605,9 @@ AWSGeneralBoto2Abstractor::get_v4_canonical_headers(
 AWSEngine::VersionAbstractor::auth_data_t
 AWSGeneralAbstractor::get_auth_data_v2(const req_state* const s) const
 {
-  boost::string_view access_key_id;
-  boost::string_view signature;
+  std::string_view access_key_id;
+  std::string_view signature;
+  std::string_view session_token;
   bool qsr = false;
 
   const char* http_auth = s->info.env->get("HTTP_AUTHORIZATION");
@@ -3933,48 +5618,65 @@ AWSGeneralAbstractor::get_auth_data_v2(const req_state* const s) const
     signature = s->info.args.get("Signature");
     qsr = true;
 
-    boost::string_view expires = s->info.args.get("Expires");
-    if (! expires.empty()) {
-      /* It looks we have the guarantee that expires is a null-terminated,
-       * and thus string_view::data() can be safely used. */
-      const time_t exp = atoll(expires.data());
-      time_t now;
-      time(&now);
+    std::string_view expires = s->info.args.get("Expires");
+    if (expires.empty()) {
+      throw -EPERM;
+    }
+
+    /* It looks we have the guarantee that expires is a null-terminated,
+     * and thus string_view::data() can be safely used. */
+    const time_t exp = atoll(expires.data());
+    time_t now;
+    time(&now);
 
-      if (now >= exp) {
+    if (now >= exp) {
+      throw -EPERM;
+    }
+    if (s->info.args.exists("x-amz-security-token")) {
+      session_token = s->info.args.get("x-amz-security-token");
+      if (session_token.size() == 0) {
         throw -EPERM;
       }
     }
+
   } else {
     /* The "Authorization" HTTP header is being used. */
-    const boost::string_view auth_str(http_auth + strlen("AWS "));
+    const std::string_view auth_str(http_auth + strlen("AWS "));
     const size_t pos = auth_str.rfind(':');
-    if (pos != boost::string_view::npos) {
+    if (pos != std::string_view::npos) {
       access_key_id = auth_str.substr(0, pos);
       signature = auth_str.substr(pos + 1);
     }
+
+    if (s->info.env->exists("HTTP_X_AMZ_SECURITY_TOKEN")) {
+      session_token = s->info.env->get("HTTP_X_AMZ_SECURITY_TOKEN");
+      if (session_token.size() == 0) {
+        throw -EPERM;
+      }
+    }
   }
 
   /* Let's canonize the HTTP headers that are covered by the AWS auth v2. */
   std::string string_to_sign;
   utime_t header_time;
-  if (! rgw_create_s3_canonical_header(s->info, &header_time, string_to_sign,
+  if (! rgw_create_s3_canonical_header(s, s->info, &header_time, string_to_sign,
         qsr)) {
-    ldout(cct, 10) << "failed to create the canonized auth header\n"
+    ldpp_dout(s, 10) << "failed to create the canonized auth header\n"
                    << rgw::crypt_sanitize::auth{s,string_to_sign} << dendl;
     throw -EPERM;
   }
 
-  ldout(cct, 10) << "string_to_sign:\n"
+  ldpp_dout(s, 10) << "string_to_sign:\n"
                  << rgw::crypt_sanitize::auth{s,string_to_sign} << dendl;
 
-  if (! is_time_skew_ok(header_time, qsr)) {
+  if (!qsr && !is_time_skew_ok(header_time)) {
     throw -ERR_REQUEST_TIME_SKEWED;
   }
 
   return {
     std::move(access_key_id),
     std::move(signature),
+    std::move(session_token),
     std::move(string_to_sign),
     rgw::auth::s3::get_v2_signature,
     null_completer_factory
@@ -3988,6 +5690,7 @@ AWSBrowserUploadAbstractor::get_auth_data_v2(const req_state* const s) const
   return {
     s->auth.s3_postobj_creds.access_key,
     s->auth.s3_postobj_creds.signature,
+    s->auth.s3_postobj_creds.x_amz_security_token,
     s->auth.s3_postobj_creds.encoded_policy.to_str(),
     rgw::auth::s3::get_v2_signature,
     null_completer_factory
@@ -3997,26 +5700,28 @@ AWSBrowserUploadAbstractor::get_auth_data_v2(const req_state* const s) const
 AWSEngine::VersionAbstractor::auth_data_t
 AWSBrowserUploadAbstractor::get_auth_data_v4(const req_state* const s) const
 {
-  const boost::string_view credential = s->auth.s3_postobj_creds.x_amz_credential;
+  const std::string_view credential = s->auth.s3_postobj_creds.x_amz_credential;
 
   /* grab access key id */
   const size_t pos = credential.find("/");
-  const boost::string_view access_key_id = credential.substr(0, pos);
-  dout(10) << "access key id = " << access_key_id << dendl;
+  const std::string_view access_key_id = credential.substr(0, pos);
+  ldpp_dout(s, 10) << "access key id = " << access_key_id << dendl;
 
   /* grab credential scope */
-  const boost::string_view credential_scope = credential.substr(pos + 1);
-  dout(10) << "credential scope = " << credential_scope << dendl;
+  const std::string_view credential_scope = credential.substr(pos + 1);
+  ldpp_dout(s, 10) << "credential scope = " << credential_scope << dendl;
 
   const auto sig_factory = std::bind(rgw::auth::s3::get_v4_signature,
                                      credential_scope,
                                      std::placeholders::_1,
                                      std::placeholders::_2,
-                                     std::placeholders::_3);
+                                     std::placeholders::_3,
+                                     s);
 
   return {
     access_key_id,
     s->auth.s3_postobj_creds.signature,
+    s->auth.s3_postobj_creds.x_amz_security_token,
     s->auth.s3_postobj_creds.encoded_policy.to_str(),
     sig_factory,
     null_completer_factory
@@ -4027,18 +5732,17 @@ AWSEngine::VersionAbstractor::auth_data_t
 AWSBrowserUploadAbstractor::get_auth_data(const req_state* const s) const
 {
   if (s->auth.s3_postobj_creds.x_amz_algorithm == AWS4_HMAC_SHA256_STR) {
-    ldout(s->cct, 0) << "Signature verification algorithm AWS v4"
+    ldpp_dout(s, 0) << "Signature verification algorithm AWS v4"
                      << " (AWS4-HMAC-SHA256)" << dendl;
     return get_auth_data_v4(s);
   } else {
-    ldout(s->cct, 0) << "Signature verification algorithm AWS v2" << dendl;
+    ldpp_dout(s, 0) << "Signature verification algorithm AWS v2" << dendl;
     return get_auth_data_v2(s);
   }
 }
 
-
 AWSEngine::result_t
-AWSEngine::authenticate(const req_state* const s) const
+AWSEngine::authenticate(const DoutPrefixProvider* dpp, const req_state* const s, optional_yield y) const
 {
   /* Small reminder: an ver_abstractor is allowed to throw! */
   const auto auth_data = ver_abstractor.get_auth_data(s);
@@ -4046,24 +5750,29 @@ AWSEngine::authenticate(const req_state* const s) const
   if (auth_data.access_key_id.empty() || auth_data.client_signature.empty()) {
     return result_t::deny(-EINVAL);
   } else {
-    return authenticate(auth_data.access_key_id,
+    return authenticate(dpp,
+                        auth_data.access_key_id,
                        auth_data.client_signature,
+                       auth_data.session_token,
                        auth_data.string_to_sign,
                         auth_data.signature_factory,
                        auth_data.completer_factory,
-                       s);
+                       s, y);
   }
 }
 
-} /* namespace s3 */
-} /* namespace auth */
-} /* namespace rgw */
+} // namespace rgw::auth::s3
 
 rgw::LDAPHelper* rgw::auth::s3::LDAPEngine::ldh = nullptr;
 std::mutex rgw::auth::s3::LDAPEngine::mtx;
 
 void rgw::auth::s3::LDAPEngine::init(CephContext* const cct)
 {
+  if (! cct->_conf->rgw_s3_auth_use_ldap ||
+      cct->_conf->rgw_ldap_uri.empty()) {
+    return;
+  }
+
   if (! ldh) {
     std::lock_guard<std::mutex> lck(mtx);
     if (! ldh) {
@@ -4083,6 +5792,11 @@ void rgw::auth::s3::LDAPEngine::init(CephContext* const cct)
   }
 }
 
+bool rgw::auth::s3::LDAPEngine::valid() {
+  std::lock_guard<std::mutex> lck(mtx);
+  return (!!ldh);
+}
+
 rgw::auth::RemoteApplier::acl_strategy_t
 rgw::auth::s3::LDAPEngine::get_acl_strategy() const
 {
@@ -4110,12 +5824,15 @@ rgw::auth::s3::LDAPEngine::get_creds_info(const rgw::RGWToken& token) const noex
 
 rgw::auth::Engine::result_t
 rgw::auth::s3::LDAPEngine::authenticate(
-  const boost::string_view& access_key_id,
-  const boost::string_view& signature,
+  const DoutPrefixProvider* dpp,
+  const std::string_view& access_key_id,
+  const std::string_view& signature,
+  const std::string_view& session_token,
   const string_to_sign_t& string_to_sign,
   const signature_factory_t&,
   const completer_factory_t& completer_factory,
-  const req_state* const s) const
+  const req_state* const s,
+  optional_yield y) const
 {
   /* boost filters and/or string_ref may throw on invalid input */
   rgw::RGWToken base64_token;
@@ -4136,74 +5853,266 @@ rgw::auth::s3::LDAPEngine::authenticate(
   user_info.user_id = base64_token.id;
   if (rgw_get_user_info_by_uid(store, user_info.user_id, user_info) >= 0) {
     if (user_info.type != TYPE_LDAP) {
-      ldout(cct, 10) << "ERROR: User id of type: " << user_info.type << " is already present" << dendl;
+      ldpp_dout(dpp, 10) << "ERROR: User id of type: " << user_info.type << " is already present" << dendl;
       return nullptr;
     }
   }*/
 
   if (ldh->auth(base64_token.id, base64_token.key) != 0) {
-    return result_t::deny();
+    return result_t::deny(-ERR_INVALID_ACCESS_KEY);
   }
 
   auto apl = apl_factory->create_apl_remote(cct, s, get_acl_strategy(),
                                             get_creds_info(base64_token));
   return result_t::grant(std::move(apl), completer_factory(boost::none));
-}
+} /* rgw::auth::s3::LDAPEngine::authenticate */
 
+void rgw::auth::s3::LDAPEngine::shutdown() {
+  if (ldh) {
+    delete ldh;
+    ldh = nullptr;
+  }
+}
 
-/* LocalEndgine */
+/* LocalEngine */
 rgw::auth::Engine::result_t
 rgw::auth::s3::LocalEngine::authenticate(
-  const boost::string_view& _access_key_id,
-  const boost::string_view& signature,
+  const DoutPrefixProvider* dpp,
+  const std::string_view& _access_key_id,
+  const std::string_view& signature,
+  const std::string_view& session_token,
   const string_to_sign_t& string_to_sign,
   const signature_factory_t& signature_factory,
   const completer_factory_t& completer_factory,
-  const req_state* const s) const
+  const req_state* const s,
+  optional_yield y) const
 {
   /* get the user info */
-  RGWUserInfo user_info;
+  std::unique_ptr<rgw::sal::User> user;
+  const std::string access_key_id(_access_key_id);
   /* TODO(rzarzynski): we need to have string-view taking variant. */
-  const std::string access_key_id = _access_key_id.to_string();
-  if (rgw_get_user_info_by_access_key(store, access_key_id, user_info) < 0) {
-      ldout(cct, 5) << "error reading user info, uid=" << access_key_id
+  if (store->get_user_by_access_key(dpp, access_key_id, y, &user) < 0) {
+      ldpp_dout(dpp, 5) << "error reading user info, uid=" << access_key_id
               << " can't authenticate" << dendl;
       return result_t::deny(-ERR_INVALID_ACCESS_KEY);
   }
   //TODO: Uncomment, when we have a migration plan in place.
   /*else {
     if (s->user->type != TYPE_RGW) {
-      ldout(cct, 10) << "ERROR: User id of type: " << s->user->type
+      ldpp_dout(dpp, 10) << "ERROR: User id of type: " << s->user->type
                      << " is present" << dendl;
       throw -EPERM;
     }
   }*/
 
-  const auto iter = user_info.access_keys.find(access_key_id);
-  if (iter == std::end(user_info.access_keys)) {
-    ldout(cct, 0) << "ERROR: access key not encoded in user info" << dendl;
+  const auto iter = user->get_info().access_keys.find(access_key_id);
+  if (iter == std::end(user->get_info().access_keys)) {
+    ldpp_dout(dpp, 0) << "ERROR: access key not encoded in user info" << dendl;
     return result_t::deny(-EPERM);
   }
   const RGWAccessKey& k = iter->second;
 
   const VersionAbstractor::server_signature_t server_signature = \
     signature_factory(cct, k.key, string_to_sign);
+  auto compare = signature.compare(server_signature);
 
-  ldout(cct, 15) << "string_to_sign="
+  ldpp_dout(dpp, 15) << "string_to_sign="
                  << rgw::crypt_sanitize::log_content{string_to_sign}
                  << dendl;
-  ldout(cct, 15) << "server signature=" << server_signature << dendl;
-  ldout(cct, 15) << "client signature=" << signature << dendl;
-  ldout(cct, 15) << "compare=" << signature.compare(server_signature) << dendl;
+  ldpp_dout(dpp, 15) << "server signature=" << server_signature << dendl;
+  ldpp_dout(dpp, 15) << "client signature=" << signature << dendl;
+  ldpp_dout(dpp, 15) << "compare=" << compare << dendl;
 
-  if (static_cast<boost::string_view>(server_signature) != signature) {
+  if (compare != 0) {
     return result_t::deny(-ERR_SIGNATURE_NO_MATCH);
   }
 
-  auto apl = apl_factory->create_apl_local(cct, s, user_info, k.subuser);
+  auto apl = apl_factory->create_apl_local(cct, s, user->get_info(),
+                                           k.subuser, std::nullopt);
   return result_t::grant(std::move(apl), completer_factory(k.key));
 }
 
+rgw::auth::RemoteApplier::AuthInfo
+rgw::auth::s3::STSEngine::get_creds_info(const STS::SessionToken& token) const noexcept
+{
+  using acct_privilege_t = \
+    rgw::auth::RemoteApplier::AuthInfo::acct_privilege_t;
+
+  return rgw::auth::RemoteApplier::AuthInfo {
+    token.user,
+    token.acct_name,
+    token.perm_mask,
+    (token.is_admin) ? acct_privilege_t::IS_ADMIN_ACCT: acct_privilege_t::IS_PLAIN_ACCT,
+    token.acct_type
+  };
+}
+
+int
+rgw::auth::s3::STSEngine::get_session_token(const DoutPrefixProvider* dpp, const std::string_view& session_token,
+                                            STS::SessionToken& token) const
+{
+  string decodedSessionToken;
+  try {
+    decodedSessionToken = rgw::from_base64(session_token);
+  } catch (...) {
+    ldpp_dout(dpp, 0) << "ERROR: Invalid session token, not base64 encoded." << dendl;
+    return -EINVAL;
+  }
+
+  auto* cryptohandler = cct->get_crypto_handler(CEPH_CRYPTO_AES);
+  if (! cryptohandler) {
+    return -EINVAL;
+  }
+  string secret_s = cct->_conf->rgw_sts_key;
+  buffer::ptr secret(secret_s.c_str(), secret_s.length());
+  int ret = 0;
+  if (ret = cryptohandler->validate_secret(secret); ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: Invalid secret key" << dendl;
+    return -EINVAL;
+  }
+  string error;
+  std::unique_ptr<CryptoKeyHandler> keyhandler(cryptohandler->get_key_handler(secret, error));
+  if (! keyhandler) {
+    return -EINVAL;
+  }
+  error.clear();
+
+  string decrypted_str;
+  buffer::list en_input, dec_output;
+  en_input = buffer::list::static_from_string(decodedSessionToken);
+
+  ret = keyhandler->decrypt(en_input, dec_output, &error);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: Decryption failed: " << error << dendl;
+    return -EPERM;
+  } else {
+    try {
+      dec_output.append('\0');
+      auto iter = dec_output.cbegin();
+      decode(token, iter);
+    } catch (const buffer::error& e) {
+      ldpp_dout(dpp, 0) << "ERROR: decode SessionToken failed: " << error << dendl;
+      return -EINVAL;
+    }
+  }
+  return 0;
+}
+
+rgw::auth::Engine::result_t
+rgw::auth::s3::STSEngine::authenticate(
+  const DoutPrefixProvider* dpp,
+  const std::string_view& _access_key_id,
+  const std::string_view& signature,
+  const std::string_view& session_token,
+  const string_to_sign_t& string_to_sign,
+  const signature_factory_t& signature_factory,
+  const completer_factory_t& completer_factory,
+  const req_state* const s,
+  optional_yield y) const
+{
+  if (! s->info.args.exists("x-amz-security-token") &&
+      ! s->info.env->exists("HTTP_X_AMZ_SECURITY_TOKEN") &&
+      s->auth.s3_postobj_creds.x_amz_security_token.empty()) {
+    return result_t::deny();
+  }
+
+  STS::SessionToken token;
+  if (int ret = get_session_token(dpp, session_token, token); ret < 0) {
+    return result_t::reject(ret);
+  }
+  //Authentication
+  //Check if access key is not the same passed in by client
+  if (token.access_key_id != _access_key_id) {
+    ldpp_dout(dpp, 0) << "Invalid access key" << dendl;
+    return result_t::reject(-EPERM);
+  }
+  //Check if the token has expired
+  if (! token.expiration.empty()) {
+    std::string expiration = token.expiration;
+    if (! expiration.empty()) {
+      boost::optional<real_clock::time_point> exp = ceph::from_iso_8601(expiration, false);
+      if (exp) {
+        real_clock::time_point now = real_clock::now();
+        if (now >= *exp) {
+          ldpp_dout(dpp, 0) << "ERROR: Token expired" << dendl;
+          return result_t::reject(-EPERM);
+        }
+      } else {
+        ldpp_dout(dpp, 0) << "ERROR: Invalid expiration: " << expiration << dendl;
+        return result_t::reject(-EPERM);
+      }
+    }
+  }
+  //Check for signature mismatch
+  const VersionAbstractor::server_signature_t server_signature = \
+    signature_factory(cct, token.secret_access_key, string_to_sign);
+  auto compare = signature.compare(server_signature);
+
+  ldpp_dout(dpp, 15) << "string_to_sign="
+                 << rgw::crypt_sanitize::log_content{string_to_sign}
+                 << dendl;
+  ldpp_dout(dpp, 15) << "server signature=" << server_signature << dendl;
+  ldpp_dout(dpp, 15) << "client signature=" << signature << dendl;
+  ldpp_dout(dpp, 15) << "compare=" << compare << dendl;
+
+  if (compare != 0) {
+    return result_t::reject(-ERR_SIGNATURE_NO_MATCH);
+  }
+
+  // Get all the authorization info
+  std::unique_ptr<rgw::sal::User> user;
+  rgw_user user_id;
+  string role_id;
+  rgw::auth::RoleApplier::Role r;
+  rgw::auth::RoleApplier::TokenAttrs t_attrs;
+  if (! token.roleId.empty()) {
+    std::unique_ptr<rgw::sal::RGWRole> role = store->get_role(token.roleId);
+    if (role->get_by_id(dpp, y) < 0) {
+      return result_t::deny(-EPERM);
+    }
+    r.id = token.roleId;
+    r.name = role->get_name();
+    r.tenant = role->get_tenant();
+
+    vector<string> role_policy_names = role->get_role_policy_names();
+    for (auto& policy_name : role_policy_names) {
+      string perm_policy;
+      if (int ret = role->get_role_policy(dpp, policy_name, perm_policy); ret == 0) {
+        r.role_policies.push_back(std::move(perm_policy));
+      }
+    }
+  }
+
+  user = store->get_user(token.user);
+  if (! token.user.empty() && token.acct_type != TYPE_ROLE) {
+    // get user info
+    int ret = user->load_user(dpp, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 5) << "ERROR: failed reading user info: uid=" << token.user << dendl;
+      return result_t::reject(-EPERM);
+    }
+  }
+
+  if (token.acct_type == TYPE_KEYSTONE || token.acct_type == TYPE_LDAP) {
+    auto apl = remote_apl_factory->create_apl_remote(cct, s, get_acl_strategy(),
+                                            get_creds_info(token));
+    return result_t::grant(std::move(apl), completer_factory(token.secret_access_key));
+  } else if (token.acct_type == TYPE_ROLE) {
+    t_attrs.user_id = std::move(token.user); // This is mostly needed to assign the owner of a bucket during its creation
+    t_attrs.token_policy = std::move(token.policy);
+    t_attrs.role_session_name = std::move(token.role_session);
+    t_attrs.token_claims = std::move(token.token_claims);
+    t_attrs.token_issued_at = std::move(token.issued_at);
+    t_attrs.principal_tags = std::move(token.principal_tags);
+    auto apl = role_apl_factory->create_apl_role(cct, s, r, t_attrs);
+    return result_t::grant(std::move(apl), completer_factory(token.secret_access_key));
+  } else { // This is for all local users of type TYPE_RGW or TYPE_NONE
+    string subuser;
+    auto apl = local_apl_factory->create_apl_local(cct, s, user->get_info(), subuser, token.perm_mask);
+    return result_t::grant(std::move(apl), completer_factory(token.secret_access_key));
+  }
+}
+
 bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
   const req_state* s
 ) const noexcept {
@@ -4215,5 +6124,6 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
   AwsRoute route;
   std::tie(version, route) = discover_aws_flavour(s->info);
 
-  return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKOWN;
+  return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKNOWN;
 }
+