]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rest_s3.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_rest_s3.cc
index f51c38df9054c2e4c0d736ce2540b27c68ca6e3f..56d00dae3ca9b449a6af9fda650ed272c0d09c51 100644 (file)
@@ -16,6 +16,7 @@
 #include "auth/Crypto.h"
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/replace.hpp>
+#include <boost/algorithm/string/predicate.hpp>
 #include <boost/tokenizer.hpp>
 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
 #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
@@ -60,9 +61,6 @@
 #include "rgw_zone.h"
 #include "rgw_bucket_sync.h"
 
-#include "services/svc_zone.h"
-#include "services/svc_cls.h"
-
 #include "include/ceph_assert.h"
 #include "rgw_role.h"
 #include "rgw_rest_sts.h"
@@ -79,17 +77,17 @@ using namespace std;
 using namespace rgw;
 using namespace ceph::crypto;
 
-void list_all_buckets_start(struct req_state *s)
+void list_all_buckets_start(req_state *s)
 {
   s->formatter->open_array_section_in_ns("ListAllMyBucketsResult", XMLNS_AWS_S3);
 }
 
-void list_all_buckets_end(struct req_state *s)
+void list_all_buckets_end(req_state *s)
 {
   s->formatter->close_section();
 }
 
-void dump_bucket(struct req_state *s, rgw::sal::Bucket& obj)
+void dump_bucket(req_state *s, rgw::sal::Bucket& obj)
 {
   s->formatter->open_object_section("Bucket");
   s->formatter->dump_string("Name", obj.get_name());
@@ -111,7 +109,7 @@ void rgw_get_errno_s3(rgw_http_error *e , int err_no)
 }
 
 static inline std::string get_s3_expiration_header(
-  struct req_state* s,
+  req_state* s,
   const ceph::real_time& mtime)
 {
   return rgw::lc::s3_expiration_header(
@@ -119,7 +117,7 @@ static inline std::string get_s3_expiration_header(
 }
 
 static inline bool get_s3_multipart_abort_header(
-  struct req_state* s, const ceph::real_time& mtime,
+  req_state* s, const ceph::real_time& mtime,
   ceph::real_time& date, std::string& rule_id)
 {
   return rgw::lc::s3_multipart_abort_header(
@@ -301,6 +299,11 @@ int RGWGetObj_ObjStore_S3::get_params(optional_yield y)
     skip_decrypt = s->info.args.exists(RGW_SYS_PARAM_PREFIX "skip-decrypt");
   }
 
+  // multisite sync requests should fetch cloudtiered objects
+  sync_cloudtiered = s->info.args.exists(RGW_SYS_PARAM_PREFIX "sync-cloudtiered");
+
+  dst_zone_trace = s->info.args.get(RGW_SYS_PARAM_PREFIX "if-not-replicated-to");
+
   return RGWGetObj_ObjStore::get_params(y);
 }
 
@@ -431,6 +434,22 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
   } else {
     dump_header(s, "x-rgw-object-type", "Normal");
   }
+  // replication status
+  if (auto i = attrs.find(RGW_ATTR_OBJ_REPLICATION_STATUS);
+      i != attrs.end()) {
+    dump_header(s, "x-amz-replication-status", i->second);
+  }
+  if (auto i = attrs.find(RGW_ATTR_OBJ_REPLICATION_TRACE);
+      i != attrs.end()) {
+    try {
+      std::vector<rgw_zone_set_entry> zones;
+      auto p = i->second.cbegin();
+      decode(zones, p);
+      for (const auto& zone : zones) {
+        dump_header(s, "x-rgw-replicated-from", zone.to_str());
+      }
+    } catch (const buffer::error&) {} // omit x-rgw-replicated-from headers
+  }
 
   if (! op_ret) {
     if (! lo_etag.empty()) {
@@ -632,27 +651,31 @@ int RGWGetObj_ObjStore_S3::override_range_hdr(const rgw::auth::StrategyRegistry&
 
 void RGWGetObjTags_ObjStore_S3::send_response_data(bufferlist& bl)
 {
+  if (op_ret)
+    set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
-  s->formatter->open_object_section_in_ns("Tagging", XMLNS_AWS_S3);
-  s->formatter->open_object_section("TagSet");
-  if (has_tags){
-    RGWObjTagSet_S3 tagset;
-    auto iter = bl.cbegin();
-    try {
-      tagset.decode(iter);
-    } catch (buffer::error& err) {
-      ldpp_dout(this,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
-      op_ret= -EIO;
-      return;
+  if (!op_ret){
+    s->formatter->open_object_section_in_ns("Tagging", XMLNS_AWS_S3);
+    s->formatter->open_object_section("TagSet");
+    if (has_tags){
+      RGWObjTagSet_S3 tagset;
+      auto iter = bl.cbegin();
+      try {
+        tagset.decode(iter);
+      } catch (buffer::error& err) {
+        ldpp_dout(this,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+        op_ret= -EIO;
+        return;
+      }
+      tagset.dump_xml(s->formatter);
     }
-    tagset.dump_xml(s->formatter);
+    s->formatter->close_section();
+    s->formatter->close_section();
+    rgw_flush_formatter_and_reset(s, s->formatter);
   }
-  s->formatter->close_section();
-  s->formatter->close_section();
-  rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
 
@@ -702,22 +725,21 @@ void RGWPutObjTags_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
 }
 
 void RGWDeleteObjTags_ObjStore_S3::send_response()
 {
-  int r = op_ret;
-  if (r == -ENOENT)
-    r = 0;
-  if (!r)
-    r = STATUS_NO_CONTENT;
-
-  set_req_state_err(s, r);
+  if (op_ret == 0){
+    op_ret = STATUS_NO_CONTENT;
+  }
+  if (op_ret)
+    set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this);
+  end_header(s, this, to_mime_type(s->format));
+  dump_start(s);
 }
 
 void RGWGetBucketTags_ObjStore_S3::send_response_data(bufferlist& bl)
@@ -725,7 +747,7 @@ void RGWGetBucketTags_ObjStore_S3::send_response_data(bufferlist& bl)
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (!op_ret) {
@@ -788,7 +810,7 @@ int RGWPutBucketTags_ObjStore_S3::get_params(const DoutPrefixProvider *dpp, opti
   ldpp_dout(dpp, 20) << "Read " << obj_tags.count() << "tags" << dendl;
 
   // forward bucket tags requests to meta master zone
-  if (!store->is_meta_master()) {
+  if (!driver->is_meta_master()) {
     /* only need to keep this data around if we're not meta master */
     in_data = std::move(data);
   }
@@ -801,16 +823,20 @@ void RGWPutBucketTags_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
 void RGWDeleteBucketTags_ObjStore_S3::send_response()
 {
+  // A successful DeleteBucketTagging should
+  // return a 204 status code.
+  if (op_ret == 0)
+    op_ret = STATUS_NO_CONTENT;
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -1054,29 +1080,32 @@ struct ReplicationConfiguration {
       }
     };
 
-    set<rgw_zone_id> get_zone_ids_from_names(rgw::sal::Store* store,
+    set<rgw_zone_id> get_zone_ids_from_names(rgw::sal::Driver* driver,
                                              const vector<string>& zone_names) const {
       set<rgw_zone_id> ids;
 
       for (auto& name : zone_names) {
-        rgw_zone_id id;
-        if (static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->find_zone_id_by_name(name, &id)) {
-          ids.insert(std::move(id));
+       std::unique_ptr<rgw::sal::Zone> zone;
+       int ret = driver->get_zone()->get_zonegroup().get_zone_by_name(name, &zone);
+       if (ret >= 0) {
+         rgw_zone_id id = zone->get_id();
+         ids.insert(std::move(id));
         }
       }
 
       return ids;
     }
 
-    vector<string> get_zone_names_from_ids(rgw::sal::Store* store,
+    vector<string> get_zone_names_from_ids(rgw::sal::Driver* driver,
                                            const set<rgw_zone_id>& zone_ids) const {
       vector<string> names;
 
       for (auto& id : zone_ids) {
-        RGWZone *zone;
-        if (static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->find_zone(id, &zone)) {
-          names.emplace_back(zone->name);
-        }
+       std::unique_ptr<rgw::sal::Zone> zone;
+       int ret = driver->get_zone()->get_zonegroup().get_zone_by_id(id.id, &zone);
+       if (ret >= 0) {
+         names.emplace_back(zone->get_name());
+       }
       }
 
       return names;
@@ -1138,7 +1167,7 @@ struct ReplicationConfiguration {
       return true;
     }
 
-    int to_sync_policy_pipe(req_state *s, rgw::sal::Store* store,
+    int to_sync_policy_pipe(req_state *s, rgw::sal::Driver* driver,
                             rgw_sync_bucket_pipes *pipe,
                             bool *enabled) const {
       if (!is_valid(s->cct)) {
@@ -1154,12 +1183,12 @@ struct ReplicationConfiguration {
                              destination.bucket);
 
       if (source && !source->zone_names.empty()) {
-        pipe->source.zones = get_zone_ids_from_names(store, source->zone_names);
+        pipe->source.zones = get_zone_ids_from_names(driver, source->zone_names);
       } else {
         pipe->source.set_all_zones(true);
       }
       if (!destination.zone_names.empty()) {
-        pipe->dest.zones = get_zone_ids_from_names(store, destination.zone_names);
+        pipe->dest.zones = get_zone_ids_from_names(driver, destination.zone_names);
       } else {
         pipe->dest.set_all_zones(true);
       }
@@ -1189,7 +1218,7 @@ struct ReplicationConfiguration {
       return 0;
     }
 
-    void from_sync_policy_pipe(rgw::sal::Store* store,
+    void from_sync_policy_pipe(rgw::sal::Driver* driver,
                               const rgw_sync_bucket_pipes& pipe,
                               bool enabled) {
       id = pipe.id;
@@ -1200,12 +1229,12 @@ struct ReplicationConfiguration {
         source.reset();
       } else if (pipe.source.zones) {
         source.emplace();
-        source->zone_names = get_zone_names_from_ids(store, *pipe.source.zones);
+        source->zone_names = get_zone_names_from_ids(driver, *pipe.source.zones);
       }
 
       if (!pipe.dest.all_zones &&
           pipe.dest.zones) {
-        destination.zone_names = get_zone_names_from_ids(store, *pipe.dest.zones);
+        destination.zone_names = get_zone_names_from_ids(driver, *pipe.dest.zones);
       }
 
       if (pipe.params.dest.acl_translation) {
@@ -1242,7 +1271,7 @@ struct ReplicationConfiguration {
     encode_xml("Rule", rules, f);
   }
 
-  int to_sync_policy_groups(req_state *s, rgw::sal::Store* store,
+  int to_sync_policy_groups(req_state *s, rgw::sal::Driver* driver,
                             vector<rgw_sync_policy_group> *result) const {
     result->resize(2);
 
@@ -1257,7 +1286,7 @@ struct ReplicationConfiguration {
     for (auto& rule : rules) {
       rgw_sync_bucket_pipes pipe;
       bool enabled;
-      int r = rule.to_sync_policy_pipe(s, store, &pipe, &enabled);
+      int r = rule.to_sync_policy_pipe(s, driver, &pipe, &enabled);
       if (r < 0) {
         ldpp_dout(s, 5) << "NOTICE: failed to convert replication configuration into sync policy pipe (rule.id=" << rule.id << "): " << cpp_strerror(-r) << dendl;
         return r;
@@ -1272,14 +1301,14 @@ struct ReplicationConfiguration {
     return 0;
   }
 
-  void from_sync_policy_group(rgw::sal::Store* store,
+  void from_sync_policy_group(rgw::sal::Driver* driver,
                               const rgw_sync_policy_group& group) {
 
     bool enabled = (group.status == rgw_sync_policy_group::Status::ENABLED);
 
     for (auto& pipe : group.pipes) {
       auto& rule = rules.emplace_back();
-      rule.from_sync_policy_pipe(store, pipe, enabled);
+      rule.from_sync_policy_pipe(driver, pipe, enabled);
     }
   }
 };
@@ -1291,7 +1320,7 @@ void RGWGetBucketReplication_ObjStore_S3::send_response_data()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   ReplicationConfiguration conf;
@@ -1301,11 +1330,11 @@ void RGWGetBucketReplication_ObjStore_S3::send_response_data()
 
     auto iter = policy->groups.find(enabled_group_id);
     if (iter != policy->groups.end()) {
-      conf.from_sync_policy_group(store, iter->second);
+      conf.from_sync_policy_group(driver, iter->second);
     }
     iter = policy->groups.find(disabled_group_id);
     if (iter != policy->groups.end()) {
-      conf.from_sync_policy_group(store, iter->second);
+      conf.from_sync_policy_group(driver, iter->second);
     }
   }
 
@@ -1347,13 +1376,13 @@ int RGWPutBucketReplication_ObjStore_S3::get_params(optional_yield y)
     return -ERR_MALFORMED_XML;
   }
 
-  r = conf.to_sync_policy_groups(s, store, &sync_policy_groups);
+  r = conf.to_sync_policy_groups(s, driver, &sync_policy_groups);
   if (r < 0) {
     return r;
   }
 
   // forward requests to meta master zone
-  if (!store->is_meta_master()) {
+  if (!driver->is_meta_master()) {
     /* only need to keep this data around if we're not meta master */
     in_data = std::move(data);
   }
@@ -1366,7 +1395,7 @@ void RGWPutBucketReplication_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -1381,7 +1410,7 @@ void RGWDeleteBucketReplication_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -1393,7 +1422,7 @@ void RGWListBuckets_ObjStore_S3::send_response_begin(bool has_buckets)
   dump_start(s);
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, NULL, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, NULL, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
 
   if (! op_ret) {
     list_all_buckets_start(s);
@@ -1469,7 +1498,7 @@ void RGWGetUsage_ObjStore_S3::send_response()
 
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
   if (op_ret < 0)
     return;
@@ -1544,11 +1573,11 @@ void RGWGetUsage_ObjStore_S3::send_response()
 
      // send info about quota config
      auto user_info = s->user->get_info();
-     encode_json("QuotaMaxBytes", user_info.user_quota.max_size, formatter);
+     encode_json("QuotaMaxBytes", user_info.quota.user_quota.max_size, formatter);
      encode_json("QuotaMaxBuckets", user_info.max_buckets, formatter);
-     encode_json("QuotaMaxObjCount", user_info.user_quota.max_objects, formatter);
-     encode_json("QuotaMaxBytesPerBucket", user_info.bucket_quota.max_objects, formatter);
-     encode_json("QuotaMaxObjCountPerBucket", user_info.bucket_quota.max_size, formatter);
+     encode_json("QuotaMaxObjCount", user_info.quota.user_quota.max_objects, formatter);
+     encode_json("QuotaMaxBytesPerBucket", user_info.quota.bucket_quota.max_objects, formatter);
+     encode_json("QuotaMaxObjCountPerBucket", user_info.quota.bucket_quota.max_size, formatter);
      // send info about user's capacity utilization
      encode_json("TotalBytes", stats.size, formatter);
      encode_json("TotalBytesRounded", stats.size_rounded, formatter);
@@ -1758,7 +1787,11 @@ void RGWListBucket_ObjStore_S3::send_common_response()
   s->formatter->dump_string("Prefix", prefix);
   s->formatter->dump_int("MaxKeys", max);
   if (!delimiter.empty()) {
-    s->formatter->dump_string("Delimiter", delimiter);
+    if (encode_key) {
+      s->formatter->dump_string("Delimiter", url_encode(delimiter, false));
+    } else {
+      s->formatter->dump_string("Delimiter", delimiter);
+    }
   }
   s->formatter->dump_string("IsTruncated", (max && is_truncated ? "true"
               : "false"));
@@ -1787,7 +1820,7 @@ void RGWListBucket_ObjStore_S3::send_response()
 
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
   if (op_ret < 0) {
     return;
@@ -1802,43 +1835,61 @@ void RGWListBucket_ObjStore_S3::send_response()
     s->formatter->dump_string("EncodingType", "url");
     encode_key = true;
   }
+  
   RGWListBucket_ObjStore_S3::send_common_response();
-    if (op_ret >= 0) {
-      vector<rgw_bucket_dir_entry>::iterator iter;
-      for (iter = objs.begin(); iter != objs.end(); ++iter) {
-        rgw_obj_key key(iter->key);
-        s->formatter->open_array_section("Contents");
-        if (encode_key) {
-          string key_name;
-          url_encode(key.name, key_name);
-          s->formatter->dump_string("Key", key_name);
+  
+  if (op_ret >= 0) {
+    if (s->format == RGWFormat::JSON) {
+      s->formatter->open_array_section("Contents");
+    }
+    vector<rgw_bucket_dir_entry>::iterator iter;
+    for (iter = objs.begin(); iter != objs.end(); ++iter) {
+
+      rgw_obj_key key(iter->key);
+      std::string key_name;
+
+      if (encode_key) {
+       url_encode(key.name, key_name);
       } else {
-          s->formatter->dump_string("Key", key.name);
+       key_name = key.name;
       }
-        dump_time(s, "LastModified", iter->meta.mtime);
-        s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
-        s->formatter->dump_int("Size", iter->meta.accounted_size);
-        auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
-        s->formatter->dump_string("StorageClass", storage_class.c_str());
-        dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
-        if (s->system_request) {
-          s->formatter->dump_string("RgwxTag", iter->tag);
-      }
-        if (iter->meta.appendable) {
-          s->formatter->dump_string("Type", "Appendable");
+      /* conditionally format JSON in the obvious way--I'm unsure if
+       * AWS actually does this */
+      if (s->format == RGWFormat::XML) {
+       s->formatter->open_array_section("Contents");
       } else {
-          s->formatter->dump_string("Type", "Normal");
+       // json
+       s->formatter->open_object_section("dummy");
       }
-        s->formatter->close_section();
+      s->formatter->dump_string("Key", key_name);
+      dump_time(s, "LastModified", iter->meta.mtime);
+      s->formatter->dump_format("ETag", "\"%s\"", iter->meta.etag.c_str());
+      s->formatter->dump_int("Size", iter->meta.accounted_size);
+      auto& storage_class = rgw_placement_rule::get_canonical_storage_class(iter->meta.storage_class);
+      s->formatter->dump_string("StorageClass", storage_class.c_str());
+      dump_owner(s, rgw_user(iter->meta.owner), iter->meta.owner_display_name);
+      if (s->system_request) {
+       s->formatter->dump_string("RgwxTag", iter->tag);
+      }
+      if (iter->meta.appendable) {
+       s->formatter->dump_string("Type", "Appendable");
+       } else {
+       s->formatter->dump_string("Type", "Normal");
+      }
+      // JSON has one extra section per element
+      s->formatter->close_section();
+    } // foreach obj
+    if (s->format == RGWFormat::JSON) {
+      s->formatter->close_section();
     }
   }
-    s->formatter->dump_string("Marker", marker.name);
-    if (is_truncated && !next_marker.empty()) {
-      s->formatter->dump_string("NextMarker", next_marker.name);
-    }
-    s->formatter->close_section();
-    rgw_flush_formatter_and_reset(s, s->formatter);
-}
+  s->formatter->dump_string("Marker", marker.name);
+  if (is_truncated && !next_marker.empty()) {
+    s->formatter->dump_string("NextMarker", next_marker.name);
+  }
+  s->formatter->close_section();
+  rgw_flush_formatter_and_reset(s, s->formatter);
+} /* RGWListBucket_ObjStore_S3::send_response() */
 
 void RGWListBucket_ObjStore_S3v2::send_versioned_response()
 {
@@ -1943,7 +1994,7 @@ void RGWListBucket_ObjStore_S3v2::send_response()
 
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
   if (op_ret < 0) {
     return;
@@ -2009,7 +2060,7 @@ void RGWListBucket_ObjStore_S3v2::send_response()
 void RGWGetBucketLogging_ObjStore_S3::send_response()
 {
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   s->formatter->open_object_section_in_ns("BucketLoggingStatus", XMLNS_AWS_S3);
@@ -2023,12 +2074,12 @@ void RGWGetBucketLocation_ObjStore_S3::send_response()
   end_header(s, this);
   dump_start(s);
 
-  RGWZoneGroup zonegroup;
+  std::unique_ptr<rgw::sal::ZoneGroup> zonegroup;
   string api_name;
 
-  int ret = store->get_zone()->get_zonegroup(s->bucket->get_info().zonegroup, zonegroup);
+  int ret = driver->get_zonegroup(s->bucket->get_info().zonegroup, &zonegroup);
   if (ret >= 0) {
-    api_name = zonegroup.api_name;
+    api_name = zonegroup->get_api_name();
   } else  {
     if (s->bucket->get_info().zonegroup != "default") {
       api_name = s->bucket->get_info().zonegroup;
@@ -2045,7 +2096,7 @@ void RGWGetBucketVersioning_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   s->formatter->open_object_section_in_ns("VersioningConfiguration", XMLNS_AWS_S3);
@@ -2122,7 +2173,7 @@ int RGWSetBucketVersioning_ObjStore_S3::get_params(optional_yield y)
     return -EINVAL;
   }
 
-  if (!store->is_meta_master()) {
+  if (!driver->is_meta_master()) {
     /* only need to keep this data around if we're not meta master */
     in_data.append(data);
   }
@@ -2156,7 +2207,7 @@ void RGWSetBucketVersioning_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
 }
 
 int RGWSetBucketWebsite_ObjStore_S3::get_params(optional_yield y)
@@ -2233,7 +2284,7 @@ void RGWSetBucketWebsite_ObjStore_S3::send_response()
   if (op_ret < 0)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
 }
 
 void RGWDeleteBucketWebsite_ObjStore_S3::send_response()
@@ -2243,7 +2294,7 @@ void RGWDeleteBucketWebsite_ObjStore_S3::send_response()
   }
   set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
 }
 
 void RGWGetBucketWebsite_ObjStore_S3::send_response()
@@ -2251,7 +2302,7 @@ void RGWGetBucketWebsite_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (op_ret < 0) {
@@ -2266,18 +2317,18 @@ void RGWGetBucketWebsite_ObjStore_S3::send_response()
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-static void dump_bucket_metadata(struct req_state *s, rgw::sal::Bucket* bucket)
+static void dump_bucket_metadata(req_state *s, rgw::sal::Bucket* bucket)
 {
   dump_header(s, "X-RGW-Object-Count", static_cast<long long>(bucket->get_count()));
   dump_header(s, "X-RGW-Bytes-Used", static_cast<long long>(bucket->get_size()));
   // only bucket's owner is allowed to get the quota settings of the account
   if (bucket->is_owner(s->user.get())) {
     auto user_info = s->user->get_info();
-    dump_header(s, "X-RGW-Quota-User-Size", static_cast<long long>(user_info.user_quota.max_size));
-    dump_header(s, "X-RGW-Quota-User-Objects", static_cast<long long>(user_info.user_quota.max_objects));
+    dump_header(s, "X-RGW-Quota-User-Size", static_cast<long long>(user_info.quota.user_quota.max_size));
+    dump_header(s, "X-RGW-Quota-User-Objects", static_cast<long long>(user_info.quota.user_quota.max_objects));
     dump_header(s, "X-RGW-Quota-Max-Buckets", static_cast<long long>(user_info.max_buckets));
-    dump_header(s, "X-RGW-Quota-Bucket-Size", static_cast<long long>(user_info.bucket_quota.max_size));
-    dump_header(s, "X-RGW-Quota-Bucket-Objects", static_cast<long long>(user_info.bucket_quota.max_objects));
+    dump_header(s, "X-RGW-Quota-Bucket-Size", static_cast<long long>(user_info.quota.bucket_quota.max_size));
+    dump_header(s, "X-RGW-Quota-Bucket-Objects", static_cast<long long>(user_info.quota.bucket_quota.max_objects));
   }
 }
 
@@ -2294,7 +2345,7 @@ void RGWStatBucket_ObjStore_S3::send_response()
   dump_start(s);
 }
 
-static int create_s3_policy(struct req_state *s, rgw::sal::Store* store,
+static int create_s3_policy(req_state *s, rgw::sal::Driver* driver,
                            RGWAccessControlPolicy_S3& s3policy,
                            ACLOwner& owner)
 {
@@ -2302,7 +2353,7 @@ static int create_s3_policy(struct req_state *s, rgw::sal::Store* store,
     if (!s->canned_acl.empty())
       return -ERR_INVALID_REQUEST;
 
-    return s3policy.create_from_headers(s, store, s->info.env, owner);
+    return s3policy.create_from_headers(s, driver, s->info.env, owner);
   }
 
   return s3policy.create_canned(owner, s->bucket_owner, s->canned_acl);
@@ -2368,7 +2419,7 @@ int RGWCreateBucket_ObjStore_S3::get_params(optional_yield y)
     if (r) return r;
   }
 
-  r = create_s3_policy(s, store, s3policy, s->owner);
+  r = create_s3_policy(s, driver, s3policy, s->owner);
   if (r < 0)
     return r;
 
@@ -2463,7 +2514,7 @@ void RGWDeleteBucket_ObjStore_S3::send_response()
   end_header(s, this);
 }
 
-static inline void map_qs_metadata(struct req_state* s, bool crypto_too)
+static inline void map_qs_metadata(req_state* s, bool crypto_too)
 {
   /* merge S3 valid user metadata from the query-string into
    * x_meta_map, which maps them to attributes */
@@ -2481,8 +2532,15 @@ static inline void map_qs_metadata(struct req_state* s, bool crypto_too)
 
 int RGWPutObj_ObjStore_S3::get_params(optional_yield y)
 {
-  if (!s->length)
-    return -ERR_LENGTH_REQUIRED;
+  if (!s->length) {
+    const char *encoding = s->info.env->get("HTTP_TRANSFER_ENCODING");
+    if (!encoding || strcmp(encoding, "chunked") != 0) {
+      ldout(s->cct, 20) << "neither length nor chunked encoding" << dendl;
+      return -ERR_LENGTH_REQUIRED;
+    }
+
+    chunked_upload = true;
+  }
 
   int ret;
 
@@ -2494,7 +2552,7 @@ int RGWPutObj_ObjStore_S3::get_params(optional_yield y)
   }
 
   RGWAccessControlPolicy_S3 s3policy(s->cct);
-  ret = create_s3_policy(s, store, s3policy, s->owner);
+  ret = create_s3_policy(s, driver, s3policy, s->owner);
   if (ret < 0)
     return ret;
 
@@ -2635,7 +2693,7 @@ void RGWPutObj_ObjStore_S3::send_response()
       dump_errno(s);
       dump_header_if_nonempty(s, "x-amz-version-id", version_id);
       dump_header_if_nonempty(s, "x-amz-expiration", expires);
-      end_header(s, this, "application/xml");
+      end_header(s, this, to_mime_type(s->format));
       dump_start(s);
       struct tm tmp;
       utime_t ut(mtime);
@@ -2717,7 +2775,7 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter(
                                  multipart_upload_id);
     std::unique_ptr<rgw::sal::Object> obj = upload->get_meta_obj();
     obj->set_in_extra_data(true);
-    res = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
+    res = obj->get_obj_attrs(s->yield, this);
     if (res == 0) {
       std::unique_ptr<BlockCrypt> block_crypt;
       /* We are adding to existing object.
@@ -2870,7 +2928,7 @@ int RGWPostObj_ObjStore_S3::get_params(optional_yield y)
 
   if (! storage_class.empty()) {
     s->dest_placement.storage_class = storage_class;
-    if (!store->get_zone()->get_params().valid_placement(s->dest_placement)) {
+    if (!driver->valid_placement(s->dest_placement)) {
       ldpp_dout(this, 0) << "NOTICE: invalid dest placement: " << s->dest_placement.to_str() << dendl;
       err_msg = "The storage class you specified is not valid";
       return -EINVAL;
@@ -3336,7 +3394,7 @@ int RGWCopyObj_ObjStore_S3::init_dest_policy()
   RGWAccessControlPolicy_S3 s3policy(s->cct);
 
   /* build a policy for the target object */
-  int r = create_s3_policy(s, store, s3policy, s->owner);
+  int r = create_s3_policy(s, driver, s3policy, s->owner);
   if (r < 0)
     return r;
 
@@ -3378,16 +3436,10 @@ int RGWCopyObj_ObjStore_S3::get_params(optional_yield y)
     obj_legal_hold = new RGWObjectLegalHold(obj_legal_hold_str);
   }
 
-  if_mod = s->info.env->get("HTTP_X_AMZ_COPY_IF_MODIFIED_SINCE");
-  if_unmod = s->info.env->get("HTTP_X_AMZ_COPY_IF_UNMODIFIED_SINCE");
-  if_match = s->info.env->get("HTTP_X_AMZ_COPY_IF_MATCH");
-  if_nomatch = s->info.env->get("HTTP_X_AMZ_COPY_IF_NONE_MATCH");
-
-  src_tenant_name = s->src_tenant_name;
-  src_bucket_name = s->src_bucket_name;
-  dest_tenant_name = s->bucket->get_tenant();
-  dest_bucket_name = s->bucket->get_name();
-  dest_obj_name = s->object->get_name();
+  if_mod = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE");
+  if_unmod = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE");
+  if_match = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_IF_MATCH");
+  if_nomatch = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_IF_NONE_MATCH");
 
   if (s->system_request) {
     source_zone = s->info.args.get(RGW_SYS_PARAM_PREFIX "source-zone");
@@ -3412,9 +3464,9 @@ int RGWCopyObj_ObjStore_S3::get_params(optional_yield y)
   }
 
   if (source_zone.empty() &&
-      (dest_tenant_name.compare(src_tenant_name) == 0) &&
-      (dest_bucket_name.compare(src_bucket_name) == 0) &&
-      (dest_obj_name.compare(s->src_object->get_name()) == 0) &&
+      (s->bucket->get_tenant() == s->src_tenant_name) &&
+      (s->bucket->get_name() == s->src_bucket_name) &&
+      (s->object->get_name() == s->src_object->get_name()) &&
       s->src_object->get_instance().empty() &&
       (attrs_mod != rgw::sal::ATTRSMOD_REPLACE)) {
     need_to_check_storage_class = true;
@@ -3445,7 +3497,7 @@ void RGWCopyObj_ObjStore_S3::send_partial_response(off_t ofs)
 
     // Explicitly use chunked transfer encoding so that we can stream the result
     // to the user without having to wait for the full length of it.
-    end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+    end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
     dump_start(s);
     if (op_ret == 0) {
       s->formatter->open_object_section_in_ns("CopyObjectResult", XMLNS_AWS_S3);
@@ -3468,7 +3520,7 @@ void RGWCopyObj_ObjStore_S3::send_response()
   if (op_ret == 0) {
     dump_time(s, "LastModified", mtime);
     if (!etag.empty()) {
-      s->formatter->dump_string("ETag", std::move(etag));
+      s->formatter->dump_format("ETag", "\"%s\"",etag.c_str());
     }
     s->formatter->close_section();
     rgw_flush_formatter_and_reset(s, s->formatter);
@@ -3480,7 +3532,7 @@ void RGWGetACLs_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
   rgw_flush_formatter(s, s->formatter);
   dump_body(s, acls);
@@ -3507,8 +3559,8 @@ int RGWPutACLs_ObjStore_S3::get_params(optional_yield y)
   return ret;
 }
 
-int RGWPutACLs_ObjStore_S3::get_policy_from_state(rgw::sal::Store* store,
-                                                 struct req_state *s,
+int RGWPutACLs_ObjStore_S3::get_policy_from_state(rgw::sal::Driver* driver,
+                                                 req_state *s,
                                                  stringstream& ss)
 {
   RGWAccessControlPolicy_S3 s3policy(s->cct);
@@ -3519,7 +3571,7 @@ int RGWPutACLs_ObjStore_S3::get_policy_from_state(rgw::sal::Store* store,
       s->canned_acl.clear();
   }
 
-  int r = create_s3_policy(s, store, s3policy, owner);
+  int r = create_s3_policy(s, driver, s3policy, owner);
   if (r < 0)
     return r;
 
@@ -3533,7 +3585,7 @@ void RGWPutACLs_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -3567,7 +3619,7 @@ void RGWGetLC_ObjStore_S3::send_response()
     }
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (op_ret < 0)
@@ -3582,7 +3634,7 @@ void RGWPutLC_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -3594,7 +3646,7 @@ void RGWDeleteLC_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -3607,7 +3659,7 @@ void RGWGetCORS_ObjStore_S3::send_response()
       set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, NULL, "application/xml");
+  end_header(s, NULL, to_mime_type(s->format));
   dump_start(s);
   if (! op_ret) {
     string cors;
@@ -3668,7 +3720,7 @@ int RGWPutCORS_ObjStore_S3::get_params(optional_yield y)
   }
 
   // forward bucket cors requests to meta master zone
-  if (!store->is_meta_master()) {
+  if (!driver->is_meta_master()) {
     /* only need to keep this data around if we're not meta master */
     in_data.append(data);
   }
@@ -3689,7 +3741,7 @@ void RGWPutCORS_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, NULL, "application/xml");
+  end_header(s, NULL, to_mime_type(s->format));
   dump_start(s);
 }
 
@@ -3746,7 +3798,7 @@ void RGWGetBucketEncryption_ObjStore_S3::send_response()
   }
 
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (!op_ret) {
@@ -3770,7 +3822,7 @@ void RGWDeleteBucketEncryption_ObjStore_S3::send_response()
 void RGWGetRequestPayment_ObjStore_S3::send_response()
 {
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   s->formatter->open_object_section_in_ns("RequestPaymentConfiguration", XMLNS_AWS_S3);
@@ -3851,10 +3903,18 @@ void RGWSetRequestPayment_ObjStore_S3::send_response()
 
 int RGWInitMultipart_ObjStore_S3::get_params(optional_yield y)
 {
+  int ret;
+
+  ret = get_encryption_defaults(s);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << __func__ << "(): get_encryption_defaults() returned ret=" << ret << dendl;
+    return ret;
+  }
+
   RGWAccessControlPolicy_S3 s3policy(s->cct);
-  op_ret = create_s3_policy(s, store, s3policy, s->owner);
-  if (op_ret < 0)
-    return op_ret;
+  ret = create_s3_policy(s, driver, s3policy, s->owner);
+  if (ret < 0)
+    return ret;
 
   policy = s3policy;
 
@@ -3875,7 +3935,7 @@ void RGWInitMultipart_ObjStore_S3::send_response()
     dump_time_header(s, "x-amz-abort-date", abort_date);
     dump_header_if_nonempty(s, "x-amz-abort-rule-id", rule_id);
   }
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   if (op_ret == 0) {
     dump_start(s);
     s->formatter->open_object_section_in_ns("InitiateMultipartUploadResult", XMLNS_AWS_S3);
@@ -3914,7 +3974,7 @@ void RGWCompleteMultipart_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   dump_errno(s);
   dump_header_if_nonempty(s, "x-amz-version-id", version_id);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   if (op_ret == 0) {
     dump_start(s);
     s->formatter->open_object_section_in_ns("CompleteMultipartUploadResult", XMLNS_AWS_S3);
@@ -3960,7 +4020,7 @@ void RGWListMultipart_ObjStore_S3::send_response()
   dump_errno(s);
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
 
   if (op_ret == 0) {
     dump_start(s);
@@ -4013,7 +4073,7 @@ void RGWListBucketMultiparts_ObjStore_S3::send_response()
 
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
   dump_start(s);
   if (op_ret < 0)
     return;
@@ -4107,15 +4167,17 @@ void RGWDeleteMultiObj_ObjStore_S3::begin_response()
   dump_start(s);
   // Explicitly use chunked transfer encoding so that we can stream the result
   // to the user without having to wait for the full length of it.
-  end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  end_header(s, this, to_mime_type(s->format), CHUNKED_TRANSFER_ENCODING);
   s->formatter->open_object_section_in_ns("DeleteResult", XMLNS_AWS_S3);
 
   rgw_flush_formatter(s, s->formatter);
 }
 
-void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
+void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(const rgw_obj_key& key,
                                                          bool delete_marker,
-                                                         const string& marker_version_id, int ret)
+                                                         const string& marker_version_id,
+                                                          int ret,
+                                                          boost::asio::deadline_timer *formatter_flush_cond)
 {
   if (!key.empty()) {
     delete_multi_obj_entry ops_log_entry;
@@ -4161,7 +4223,11 @@ void RGWDeleteMultiObj_ObjStore_S3::send_partial_response(rgw_obj_key& key,
     }
 
     ops_log_entries.push_back(std::move(ops_log_entry));
-    rgw_flush_formatter(s, s->formatter);
+    if (formatter_flush_cond) {
+      formatter_flush_cond->cancel();
+    } else {
+      rgw_flush_formatter(s, s->formatter);
+    }
   }
 }
 
@@ -4186,7 +4252,7 @@ void RGWGetObjLayout_ObjStore_S3::send_response()
   }
 
   f.open_object_section("result");
-  s->object->dump_obj_layout(this, s->yield, &f, s->obj_ctx);
+  s->object->dump_obj_layout(this, s->yield, &f);
   f.close_section();
   rgw_flush_formatter(s, &f);
 }
@@ -4265,7 +4331,7 @@ void RGWGetBucketMetaSearch_ObjStore_S3::send_response()
   if (op_ret)
     set_req_state_err(s, op_ret);
   dump_errno(s);
-  end_header(s, NULL, "application/xml");
+  end_header(s, NULL, to_mime_type(s->format));
 
   Formatter *f = s->formatter;
   f->open_array_section("GetBucketMetaSearchResult");
@@ -4314,7 +4380,7 @@ void RGWGetBucketObjectLock_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (op_ret) {
@@ -4353,7 +4419,7 @@ void RGWGetObjRetention_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (op_ret) {
@@ -4378,7 +4444,7 @@ void RGWGetObjLegalHold_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   if (op_ret) {
@@ -4394,7 +4460,7 @@ void RGWGetBucketPolicyStatus_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   s->formatter->open_object_section_in_ns("PolicyStatus", XMLNS_AWS_S3);
@@ -4423,7 +4489,7 @@ void RGWGetBucketPublicAccessBlock_ObjStore_S3::send_response()
     set_req_state_err(s, op_ret);
   }
   dump_errno(s);
-  end_header(s, this, "application/xml");
+  end_header(s, this, to_mime_type(s->format));
   dump_start(s);
 
   access_conf.dump_xml(s->formatter);
@@ -4444,49 +4510,6 @@ RGWOp *RGWHandler_REST_Service_S3::op_head()
   return new RGWListBuckets_ObjStore_S3;
 }
 
-RGWOp *RGWHandler_REST_Service_S3::op_post()
-{
-  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
-
-  int ret;
-  bufferlist data;
-  std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
-  if (ret < 0) {
-      return nullptr;
-  }
-
-  const auto post_body = data.to_str();
-
-  if (isSTSEnabled) {
-    RGWHandler_REST_STS sts_handler(auth_registry, post_body);
-    sts_handler.init(store, s, s->cio);
-    auto op = sts_handler.get_op();
-    if (op) {
-      return op;
-    }
-  }
-
-  if (isIAMEnabled) {
-    RGWHandler_REST_IAM iam_handler(auth_registry, data);
-    iam_handler.init(store, s, s->cio);
-    auto op = iam_handler.get_op();
-    if (op) {
-      return op;
-    }
-  }
-
-  if (isPSEnabled) {
-    RGWHandler_REST_PSTopic_AWS topic_handler(auth_registry, post_body);
-    topic_handler.init(store, s, s->cio);
-    auto op = topic_handler.get_op();
-    if (op) {
-      return op;
-    }
-  }
-
-  return nullptr;
-}
-
 RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data) const
 {
   // Non-website mode
@@ -4602,8 +4625,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put()
   } else if (is_notification_op()) {
     return RGWHandler_REST_PSNotifs_S3::create_put_op();
   } else if (is_replication_op()) {
-    auto sync_policy_handler = static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->get_sync_policy_handler(nullopt);
-    if (!sync_policy_handler ||
+    RGWBucketSyncPolicyHandlerRef sync_policy_handler;
+    int ret = driver->get_sync_policy_handler(s, nullopt, nullopt,
+                                            &sync_policy_handler, null_yield);
+    if (ret < 0 || !sync_policy_handler ||
         sync_policy_handler->is_legacy_config()) {
       return nullptr;
     }
@@ -4758,9 +4783,9 @@ RGWOp *RGWHandler_REST_Obj_S3::op_options()
   return new RGWOptionsCORS_ObjStore_S3;
 }
 
-int RGWHandler_REST_S3::init_from_header(rgw::sal::Store* store,
-                                        struct req_state* s,
-                                        int default_formatter,
+int RGWHandler_REST_S3::init_from_header(rgw::sal::Driver* driver,
+                                        req_state* s,
+                                        RGWFormat default_formatter,
                                         bool configurable_format)
 {
   string req;
@@ -4823,50 +4848,19 @@ int RGWHandler_REST_S3::init_from_header(rgw::sal::Store* store,
       if (s->bucket) {
        s->object = s->bucket->get_object(rgw_obj_key(encoded_obj_str, s->info.args.get("versionId")));
       } else {
-       s->object = store->get_object(rgw_obj_key(encoded_obj_str, s->info.args.get("versionId")));
+       s->object = driver->get_object(rgw_obj_key(encoded_obj_str, s->info.args.get("versionId")));
       }
     }
   } else {
     if (s->bucket) {
       s->object = s->bucket->get_object(rgw_obj_key(req_name, s->info.args.get("versionId")));
     } else {
-      s->object = store->get_object(rgw_obj_key(req_name, s->info.args.get("versionId")));
+      s->object = driver->get_object(rgw_obj_key(req_name, s->info.args.get("versionId")));
     }
   }
   return 0;
 }
 
-static int verify_mfa(rgw::sal::Store* store, RGWUserInfo *user,
-                     const string& mfa_str, bool *verified, const DoutPrefixProvider *dpp, optional_yield y)
-{
-  vector<string> params;
-  get_str_vec(mfa_str, " ", params);
-
-  if (params.size() != 2) {
-    ldpp_dout(dpp, 5) << "NOTICE: invalid mfa string provided: " << mfa_str << dendl;
-    return -EINVAL;
-  }
-
-  string& serial = params[0];
-  string& pin = params[1];
-
-  auto i = user->mfa_ids.find(serial);
-  if (i == user->mfa_ids.end()) {
-    ldpp_dout(dpp, 5) << "NOTICE: user does not have mfa device with serial=" << serial << dendl;
-    return -EACCES;
-  }
-
-  int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->cls->mfa.check_mfa(dpp, user->user_id, serial, pin, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 20) << "NOTICE: failed to check MFA, serial=" << serial << dendl;
-    return -EACCES;
-  }
-
-  *verified = true;
-
-  return 0;
-}
-
 int RGWHandler_REST_S3::postauth_init(optional_yield y)
 {
   struct req_init_state *t = &s->init_state;
@@ -4911,13 +4905,13 @@ int RGWHandler_REST_S3::postauth_init(optional_yield y)
 
   const char *mfa = s->info.env->get("HTTP_X_AMZ_MFA");
   if (mfa) {
-    ret = verify_mfa(store, &s->user->get_info(), string(mfa), &s->mfa_verified, s, y);
+    ret = s->user->verify_mfa(string(mfa), &s->mfa_verified, s, y);
   }
 
   return 0;
 }
 
-int RGWHandler_REST_S3::init(rgw::sal::Store* store, struct req_state *s,
+int RGWHandler_REST_S3::init(rgw::sal::Driver* driver, req_state *s,
                              rgw::io::BasicClient *cio)
 {
   int ret;
@@ -4953,7 +4947,7 @@ int RGWHandler_REST_S3::init(rgw::sal::Store* store, struct req_state *s,
       ldpp_dout(s, 0) << "failed to parse copy location" << dendl;
       return -EINVAL; // XXX why not -ERR_INVALID_BUCKET_NAME or -ERR_BAD_URL?
     }
-    s->src_object = store->get_object(key);
+    s->src_object = driver->get_object(key);
   }
 
   const char *sc = s->info.env->get("HTTP_X_AMZ_STORAGE_CLASS");
@@ -4961,15 +4955,15 @@ int RGWHandler_REST_S3::init(rgw::sal::Store* store, struct req_state *s,
     s->info.storage_class = sc;
   }
 
-  return RGWHandler_REST::init(store, s, cio);
+  return RGWHandler_REST::init(driver, s, cio);
 }
 
 int RGWHandler_REST_S3::authorize(const DoutPrefixProvider *dpp, optional_yield y)
 {
   if (s->info.args.exists("Action") && s->info.args.get("Action") == "AssumeRoleWithWebIdentity") {
-    return RGW_Auth_STS::authorize(dpp, store, auth_registry, s, y);
+    return RGW_Auth_STS::authorize(dpp, driver, auth_registry, s, y);
   }
-  return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
+  return RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y);
 }
 
 enum class AwsVersion {
@@ -5027,15 +5021,15 @@ discover_aws_flavour(const req_info& info)
  * it tries AWS v4 before AWS v2
  */
 int RGW_Auth_S3::authorize(const DoutPrefixProvider *dpp,
-                           rgw::sal::Store* const store,
+                           rgw::sal::Driver* const driver,
                            const rgw::auth::StrategyRegistry& auth_registry,
-                           struct req_state* const s, optional_yield y)
+                           req_state* const s, optional_yield y)
 {
 
   /* neither keystone and rados enabled; warn and exit! */
-  if (!store->ctx()->_conf->rgw_s3_auth_use_rados &&
-      !store->ctx()->_conf->rgw_s3_auth_use_keystone &&
-      !store->ctx()->_conf->rgw_s3_auth_use_ldap) {
+  if (!driver->ctx()->_conf->rgw_s3_auth_use_rados &&
+      !driver->ctx()->_conf->rgw_s3_auth_use_keystone &&
+      !driver->ctx()->_conf->rgw_s3_auth_use_ldap) {
     ldpp_dout(dpp, 0) << "WARNING: no authorization backend enabled! Users will never authenticate." << dendl;
     return -EPERM;
   }
@@ -5049,54 +5043,186 @@ int RGW_Auth_S3::authorize(const DoutPrefixProvider *dpp,
   return ret;
 }
 
-int RGWHandler_Auth_S3::init(rgw::sal::Store* store, struct req_state *state,
+int RGWHandler_Auth_S3::init(rgw::sal::Driver* driver, req_state *state,
                              rgw::io::BasicClient *cio)
 {
-  int ret = RGWHandler_REST_S3::init_from_header(store, state, RGW_FORMAT_JSON, true);
+  int ret = RGWHandler_REST_S3::init_from_header(driver, state, RGWFormat::JSON, true);
   if (ret < 0)
     return ret;
 
-  return RGWHandler_REST::init(store, state, cio);
+  return RGWHandler_REST::init(driver, state, cio);
+}
+
+namespace {
+// utility classes and functions for handling parameters with the following format:
+// Attributes.entry.{N}.{key|value}={VALUE}
+// N - any unsigned number
+// VALUE - url encoded string
+
+// and Attribute is holding key and value
+// ctor and set are done according to the "type" argument
+// if type is not "key" or "value" its a no-op
+class Attribute {
+  std::string key;
+  std::string value;
+public:
+  Attribute(const std::string& type, const std::string& key_or_value) {
+    set(type, key_or_value);
+  }
+  void set(const std::string& type, const std::string& key_or_value) {
+    if (type == "key") {
+      key = key_or_value;
+    } else if (type == "value") {
+      value = key_or_value;
+    }
+  }
+  const std::string& get_key() const { return key; }
+  const std::string& get_value() const { return value; }
+};
+
+using AttributeMap = std::map<unsigned, Attribute>;
+
+// aggregate the attributes into a map
+// the key and value are associated by the index (N)
+// no assumptions are made on the order in which these parameters are added
+void update_attribute_map(const std::string& input, AttributeMap& map) {
+  const boost::char_separator<char> sep(".");
+  const boost::tokenizer tokens(input, sep);
+  auto token = tokens.begin();
+  if (*token != "Attributes") {
+      return;
+  }
+  ++token;
+
+  if (*token != "entry") {
+      return;
+  }
+  ++token;
+
+  unsigned idx;
+  try {
+    idx = std::stoul(*token);
+  } catch (const std::invalid_argument&) {
+    return;
+  }
+  ++token;
+
+  std::string key_or_value = "";
+  // get the rest of the string regardless of dots
+  // this is to allow dots in the value
+  while (token != tokens.end()) {
+    key_or_value.append(*token+".");
+    ++token;
+  }
+  // remove last separator
+  key_or_value.pop_back();
+
+  auto pos = key_or_value.find("=");
+  if (pos != std::string::npos) {
+    const auto key_or_value_lhs = key_or_value.substr(0, pos);
+    const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
+    const auto map_it = map.find(idx);
+    if (map_it == map.end()) {
+      // new entry
+      map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
+    } else {
+      // existing entry
+      map_it->second.set(key_or_value_lhs, key_or_value_rhs);
+    }
+  }
 }
+}
+
+void parse_post_action(const std::string& post_body, req_state* s)
+{
+  if (post_body.size() > 0) {
+    ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
 
-RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::Store* store,
-                                           struct req_state* const s,
+    if (post_body.find("Action") != string::npos) {
+      const boost::char_separator<char> sep("&");
+      const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
+      AttributeMap map;
+      for (const auto& t : tokens) {
+        const auto pos = t.find("=");
+        if (pos != string::npos) {
+          const auto key = t.substr(0, pos);
+          if (boost::starts_with(key, "Attributes.")) {
+            update_attribute_map(t, map);
+          } else {
+            s->info.args.append(t.substr(0, pos),
+                              url_decode(t.substr(pos+1, t.size() -1)));
+          }
+        }
+      }
+      // update the regular args with the content of the attribute map
+      for (const auto& attr : map) {
+          s->info.args.append(attr.second.get_key(), attr.second.get_value());
+      }
+    }
+  }
+  const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
+  s->info.args.append("PayloadHash", payload_hash);
+}
+
+RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::Driver* driver,
+                                           req_state* const s,
                                             const rgw::auth::StrategyRegistry& auth_registry,
                                             const std::string& frontend_prefix)
 {
   bool is_s3website = enable_s3website && (s->prot_flags & RGW_REST_WEBSITE);
   int ret =
-    RGWHandler_REST_S3::init_from_header(store, s,
-                                       is_s3website ? RGW_FORMAT_HTML :
-                                       RGW_FORMAT_XML, true);
-  if (ret < 0)
-    return NULL;
+    RGWHandler_REST_S3::init_from_header(driver, s,
+                                       is_s3website ? RGWFormat::HTML :
+                                       RGWFormat::XML, true);
+  if (ret < 0) {
+    return nullptr;
+  }
 
-  RGWHandler_REST* handler;
-  // TODO: Make this more readable
   if (is_s3website) {
     if (s->init_state.url_bucket.empty()) {
-      handler = new RGWHandler_REST_Service_S3Website(auth_registry);
-    } else if (rgw::sal::Object::empty(s->object.get())) {
-      handler = new RGWHandler_REST_Bucket_S3Website(auth_registry);
-    } else {
-      handler = new RGWHandler_REST_Obj_S3Website(auth_registry);
+      return new RGWHandler_REST_Service_S3Website(auth_registry);
     }
-  } else {
-    if (s->init_state.url_bucket.empty()) {
-      handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam, enable_pubsub);
-    } else if (!rgw::sal::Object::empty(s->object.get())) {
-      handler = new RGWHandler_REST_Obj_S3(auth_registry);
-    } else if (s->info.args.exist_obj_excl_sub_resource()) {
-      return NULL;
-    } else {
-      handler = new RGWHandler_REST_Bucket_S3(auth_registry, enable_pubsub);
+    if (rgw::sal::Object::empty(s->object.get())) {
+      return new RGWHandler_REST_Bucket_S3Website(auth_registry);
     }
+    return new RGWHandler_REST_Obj_S3Website(auth_registry);
   }
 
-  ldpp_dout(s, 20) << __func__ << " handler=" << typeid(*handler).name()
-                   << dendl;
-  return handler;
+  if (s->init_state.url_bucket.empty()) {
+    // no bucket
+    if (s->op == OP_POST) {
+      // POST will be one of: IAM, STS or topic service
+      const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+      int ret;
+      bufferlist data;
+      std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
+      if (ret < 0) {
+        return nullptr;
+      }
+      parse_post_action(data.to_str(), s);
+      if (enable_sts && RGWHandler_REST_STS::action_exists(s)) {
+        return new RGWHandler_REST_STS(auth_registry);
+      }
+      if (enable_iam && RGWHandler_REST_IAM::action_exists(s)) {
+        return new RGWHandler_REST_IAM(auth_registry, data);
+      }
+      if (enable_pubsub && RGWHandler_REST_PSTopic_AWS::action_exists(s)) {
+        return new RGWHandler_REST_PSTopic_AWS(auth_registry); 
+      }
+      return nullptr;
+    }
+    // non-POST S3 service without a bucket
+    return new RGWHandler_REST_Service_S3(auth_registry);
+  }
+  if (!rgw::sal::Object::empty(s->object.get())) {
+    // has object
+    return new RGWHandler_REST_Obj_S3(auth_registry);
+  }
+  if (s->info.args.exist_obj_excl_sub_resource()) {
+    return nullptr;
+  }
+  // has bucket
+  return new RGWHandler_REST_Bucket_S3(auth_registry, enable_pubsub);
 }
 
 bool RGWHandler_REST_S3Website::web_dir() const {
@@ -5113,12 +5239,11 @@ bool RGWHandler_REST_S3Website::web_dir() const {
 
   std::unique_ptr<rgw::sal::Object> obj = s->bucket->get_object(rgw_obj_key(subdir_name));
 
-  RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-  obj->set_atomic(&obj_ctx);
-  obj->set_prefetch_data(&obj_ctx);
+  obj->set_atomic();
+  obj->set_prefetch_data();
 
   RGWObjState* state = nullptr;
-  if (obj->get_obj_state(s, &obj_ctx, &state, s->yield) < 0) {
+  if (obj->get_obj_state(s, &state, s->yield) < 0) {
     return false;
   }
   if (! state->exists) {
@@ -5127,7 +5252,7 @@ bool RGWHandler_REST_S3Website::web_dir() const {
   return state->exists;
 }
 
-int RGWHandler_REST_S3Website::init(rgw::sal::Store* store, req_state *s,
+int RGWHandler_REST_S3Website::init(rgw::sal::Driver* driver, req_state *s,
                                     rgw::io::BasicClient* cio)
 {
   // save the original object name before retarget() replaces it with the
@@ -5139,7 +5264,7 @@ int RGWHandler_REST_S3Website::init(rgw::sal::Store* store, req_state *s,
     original_object_name = "";
   }
 
-  return RGWHandler_REST_S3::init(store, s, cio);
+  return RGWHandler_REST_S3::init(driver, s, cio);
 }
 
 int RGWHandler_REST_S3Website::retarget(RGWOp* op, RGWOp** new_op, optional_yield y) {
@@ -5223,7 +5348,7 @@ int RGWHandler_REST_S3Website::serve_errordoc(const DoutPrefixProvider *dpp, int
   if (getop.get() == NULL) {
     return -1; // Trigger double error handler
   }
-  getop->init(store, s, this);
+  getop->init(driver, s, this);
   getop->range_str = NULL;
   getop->if_mod = NULL;
   getop->if_unmod = NULL;
@@ -5232,7 +5357,7 @@ int RGWHandler_REST_S3Website::serve_errordoc(const DoutPrefixProvider *dpp, int
   /* This is okay.  It's an error, so nothing will run after this, and it can be
    * called by abort_early(), which can be called before s->object or s->bucket
    * are set up. Note, it won't have bucket. */
-  s->object = store->get_object(errordoc_key);
+  s->object = driver->get_object(errordoc_key);
 
   ret = init_permissions(getop.get(), y);
   if (ret < 0) {
@@ -5676,6 +5801,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
         case RGW_OP_DELETE_MULTI_OBJ:
         case RGW_OP_ADMIN_SET_METADATA:
         case RGW_OP_SYNC_DATALOG_NOTIFY:
+        case RGW_OP_SYNC_DATALOG_NOTIFY2:
         case RGW_OP_SYNC_MDLOG_NOTIFY:
         case RGW_OP_PERIOD_POST:
         case RGW_OP_SET_BUCKET_WEBSITE:
@@ -5699,7 +5825,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
        case RGW_OP_GET_OBJ://s3select its post-method(payload contain the query) , the request is get-object
           break;
         default:
-          ldpp_dout(s, 10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
+          ldpp_dout(s, 10) << "ERROR: AWS4 completion for operation: " << s->op_type << ", NOT IMPLEMENTED" << dendl;
           throw -ERR_NOT_IMPLEMENTED;
       }
 
@@ -6024,7 +6150,7 @@ rgw::auth::s3::LDAPEngine::authenticate(
   //return error.
   /*RGWUserInfo user_info;
   user_info.user_id = base64_token.id;
-  if (rgw_get_user_info_by_uid(store, user_info.user_id, user_info) >= 0) {
+  if (rgw_get_user_info_by_uid(driver, user_info.user_id, user_info) >= 0) {
     if (user_info.type != TYPE_LDAP) {
       ldpp_dout(dpp, 10) << "ERROR: User id of type: " << user_info.type << " is already present" << dendl;
       return nullptr;
@@ -6064,7 +6190,7 @@ rgw::auth::s3::LocalEngine::authenticate(
   std::unique_ptr<rgw::sal::User> user;
   const std::string access_key_id(_access_key_id);
   /* TODO(rzarzynski): we need to have string-view taking variant. */
-  if (store->get_user_by_access_key(dpp, access_key_id, y, &user) < 0) {
+  if (driver->get_user_by_access_key(dpp, access_key_id, y, &user) < 0) {
       ldpp_dout(dpp, 5) << "error reading user info, uid=" << access_key_id
               << " can't authenticate" << dendl;
       return result_t::deny(-ERR_INVALID_ACCESS_KEY);
@@ -6241,7 +6367,7 @@ rgw::auth::s3::STSEngine::authenticate(
   rgw::auth::RoleApplier::Role r;
   rgw::auth::RoleApplier::TokenAttrs t_attrs;
   if (! token.roleId.empty()) {
-    std::unique_ptr<rgw::sal::RGWRole> role = store->get_role(token.roleId);
+    std::unique_ptr<rgw::sal::RGWRole> role = driver->get_role(token.roleId);
     if (role->get_by_id(dpp, y) < 0) {
       return result_t::deny(-EPERM);
     }
@@ -6258,7 +6384,7 @@ rgw::auth::s3::STSEngine::authenticate(
     }
   }
 
-  user = store->get_user(token.user);
+  user = driver->get_user(token.user);
   if (! token.user.empty() && token.acct_type != TYPE_ROLE) {
     // get user info
     int ret = user->load_user(dpp, y);