]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rest_client.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
index fada94f57643ae069d1b9f4f47462dd4b52656a1..16730cfda1f74340a61091532fbeb9668868208f 100644 (file)
@@ -1,13 +1,11 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "rgw_common.h"
 #include "rgw_rest_client.h"
 #include "rgw_auth_s3.h"
 #include "rgw_http_errors.h"
-#include "rgw_rados.h"
 
-#include "common/ceph_crypto_cms.h"
 #include "common/armor.h"
 #include "common/strtol.h"
 #include "include/str_list.h"
@@ -16,7 +14,9 @@
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-int RGWRESTSimpleRequest::get_status()
+using namespace std;
+
+int RGWHTTPSimpleRequest::get_status()
 {
   int retcode = get_req_retcode();
   if (retcode < 0) {
@@ -25,13 +25,13 @@ int RGWRESTSimpleRequest::get_status()
   return status;
 }
 
-int RGWRESTSimpleRequest::handle_header(const string& name, const string& val) 
+int RGWHTTPSimpleRequest::handle_header(const string& name, const string& val) 
 {
   if (name == "CONTENT_LENGTH") {
     string err;
     long len = strict_strtol(val.c_str(), 10, &err);
     if (!err.empty()) {
-      ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
+      ldpp_dout(this, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
       return -EINVAL;
     }
 
@@ -41,13 +41,15 @@ int RGWRESTSimpleRequest::handle_header(const string& name, const string& val)
   return 0;
 }
 
-int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len)
 {
+  unique_lock guard(out_headers_lock);
+
   char line[len + 1];
 
   char *s = (char *)ptr, *end = (char *)ptr + len;
   char *p = line;
-  ldout(cct, 10) << "receive_http_header" << dendl;
+  ldpp_dout(this, 30) << "receive_http_header" << dendl;
 
   while (s != end) {
     if (*s == '\r') {
@@ -56,7 +58,7 @@ int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
     }
     if (*s == '\n') {
       *p = '\0';
-      ldout(cct, 10) << "received header:" << line << dendl;
+      ldpp_dout(this, 30) << "received header:" << line << dendl;
       // TODO: fill whatever data required here
       char *l = line;
       char *tok = strsep(&l, " \t:");
@@ -102,50 +104,21 @@ static void get_new_date_str(string& date_str)
   date_str = rgw_to_asctime(ceph_clock_now());
 }
 
-int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
+static void get_gmt_date_str(string& date_str)
 {
-  string new_url = url;
-  string new_resource = resource;
+  auto now_time = ceph::real_clock::now();
+  time_t rawtime = ceph::real_clock::to_time_t(now_time);
 
-  if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
-    new_url = new_url.substr(0, new_url.size() - 1);
-  } else if (resource[0] != '/') {
-    new_resource = "/";
-    new_resource.append(resource);
-  }
-  new_url.append(new_resource);
+  char buffer[80];
 
-  string date_str;
-  get_new_date_str(date_str);
-  headers.push_back(pair<string, string>("HTTP_DATE", date_str));
-
-  string canonical_header;
-  map<string, string> meta_map;
-  map<string, string> sub_resources;
-  rgw_create_s3_canonical_header(method, NULL, NULL, date_str.c_str(),
-                            meta_map, meta_map, new_url.c_str(), sub_resources,
-                            canonical_header);
-
-  string digest;
-  try {
-    digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
-  } catch (int ret) {
-    return ret;
-  }
-
-  string auth_hdr = "AWS " + key.id + ":" + digest;
-
-  ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
-
-  headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
-  int r = process(method, new_url.c_str());
-  if (r < 0)
-    return r;
-
-  return status;
+  struct tm timeInfo;
+  gmtime_r(&rawtime, &timeInfo);
+  strftime(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S %z", &timeInfo);  
+  
+  date_str = buffer;
 }
 
-int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len, bool* pause)
 {
   if (!send_iter)
     return 0;
@@ -158,7 +131,7 @@ int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
   return len;
 }
 
-int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
 {
   size_t cp_len, left_len;
 
@@ -172,10 +145,9 @@ int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
   response.append(p);
 
   return 0;
-
 }
 
-void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
+static void append_param(string& dest, const string& name, const string& val)
 {
   if (dest.empty()) {
     dest.append("?");
@@ -194,38 +166,54 @@ void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const
   }
 }
 
-void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
+static void do_get_params_str(const param_vec_t& params, map<string, string>& extra_args, string& dest)
 {
   map<string, string>::iterator miter;
   for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
     append_param(dest, miter->first, miter->second);
   }
-  param_vec_t::iterator iter;
-  for (iter = params.begin(); iter != params.end(); ++iter) {
+  for (auto iter = params.begin(); iter != params.end(); ++iter) {
     append_param(dest, iter->first, iter->second);
   }
 }
 
-int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
+void RGWHTTPSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
+{
+  do_get_params_str(params, extra_args, dest);
+}
+
+void RGWHTTPSimpleRequest::get_out_headers(map<string, string> *pheaders)
+{
+  unique_lock guard(out_headers_lock);
+  pheaders->swap(out_headers);
+  out_headers.clear();
+}
+
+static int sign_request_v2(const DoutPrefixProvider *dpp, const RGWAccessKey& key,
+                        const string& region, const string& service,
+                        RGWEnv& env, req_info& info,
+                        const bufferlist *opt_content)
 {
   /* don't sign if no key is provided */
   if (key.key.empty()) {
     return 0;
   }
 
-  if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+  auto cct = dpp->get_cct();
+
+  if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
     for (const auto& i: env.get_map()) {
-      ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
+      ldpp_dout(dpp, 20) << __func__ << "():> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
     }
   }
 
   string canonical_header;
-  if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
-    ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
+  if (!rgw_create_s3_canonical_header(dpp, info, NULL, canonical_header, false)) {
+    ldpp_dout(dpp, 0) << "failed to create canonical s3 header" << dendl;
     return -EINVAL;
   }
 
-  ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
+  ldpp_dout(dpp, 10) << "generated canonical header: " << canonical_header << dendl;
 
   string digest;
   try {
@@ -235,14 +223,147 @@ int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info&
   }
 
   string auth_hdr = "AWS " + key.id + ":" + digest;
-  ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
-  
+  ldpp_dout(dpp, 15) << "generated auth header: " << auth_hdr << dendl;
+
   env.set("AUTHORIZATION", auth_hdr);
 
   return 0;
 }
 
-int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
+static int sign_request_v4(const DoutPrefixProvider *dpp, const RGWAccessKey& key,
+                           const string& region, const string& service,
+                           RGWEnv& env, req_info& info,
+                           const bufferlist *opt_content)
+{
+  /* don't sign if no key is provided */
+  if (key.key.empty()) {
+    return 0;
+  }
+
+  auto cct = dpp->get_cct();
+
+  if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
+    for (const auto& i: env.get_map()) {
+      ldpp_dout(dpp, 20) << __func__ << "():> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
+    }
+  }
+
+  rgw::auth::s3::AWSSignerV4::prepare_result_t sigv4_data;
+  if (service == "s3") {
+    sigv4_data = rgw::auth::s3::AWSSignerV4::prepare(dpp, key.id, region, service, info, opt_content, true);
+  } else {
+    sigv4_data = rgw::auth::s3::AWSSignerV4::prepare(dpp, key.id, region, service, info, opt_content, false);
+  }
+  auto sigv4_headers = sigv4_data.signature_factory(dpp, key.key, sigv4_data);
+
+  for (auto& entry : sigv4_headers) {
+    ldpp_dout(dpp, 20) << __func__ << "(): sigv4 header: " << entry.first << ": " << entry.second << dendl;
+    env.set(entry.first, entry.second);
+  }
+
+  return 0;
+}
+
+static int sign_request(const DoutPrefixProvider *dpp, const RGWAccessKey& key,
+                        const string& region, const string& service,
+                        RGWEnv& env, req_info& info,
+                        const bufferlist *opt_content)
+{
+  auto authv = dpp->get_cct()->_conf.get_val<int64_t>("rgw_s3_client_max_sig_ver");
+  if (authv > 0 &&
+      authv <= 3) {
+    return sign_request_v2(dpp, key, region, service, env, info, opt_content);
+  }
+
+  return sign_request_v4(dpp, key, region, service, env, info, opt_content);
+}
+
+static string extract_region_name(string&& s)
+{
+  if (s == "s3") {
+      return "us-east-1";
+  }
+  if (boost::algorithm::starts_with(s, "s3-")) {
+    return s.substr(3);
+  }
+  return std::move(s);
+}
+
+
+static bool identify_scope(const DoutPrefixProvider *dpp,
+                           CephContext *cct,
+                           const string& host,
+                           string *region,
+                           string& service)
+{
+  if (!boost::algorithm::ends_with(host, "amazonaws.com")) {
+    ldpp_dout(dpp, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl;
+    return false;
+  }
+
+  vector<string> vec;
+
+  get_str_vec(host, ".", vec);
+
+  string ser = service;
+  if (service.empty()) {
+    service = "s3"; /* default */
+  }
+
+  for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
+    auto& s = *iter;
+    if (s == "s3" ||
+        s == "execute-api" ||
+        s == "iam") {
+      if (s == "execute-api") {
+        service = s;
+      }
+      ++iter;
+      if (iter == vec.end()) {
+        ldpp_dout(dpp, 0) << "WARNING: cannot identify region name from host name: " << host << dendl;
+        return false;
+      }
+      auto& next = *iter;
+      if (next == "amazonaws") {
+        *region = "us-east-1";
+        return true;
+      }
+      *region = next;
+      return true;
+    } else if (boost::algorithm::starts_with(s, "s3-")) {
+      *region = extract_region_name(std::move(s));
+      return true;
+    }
+  }
+
+  return false;
+}
+
+static void scope_from_api_name(const DoutPrefixProvider *dpp,
+                                CephContext *cct,
+                                const string& host,
+                                std::optional<string> api_name,
+                                string *region,
+                                string& service)
+{
+  if (api_name && service.empty()) {
+    *region = *api_name;
+    service = "s3";
+    return;
+  }
+
+  if (!identify_scope(dpp, cct, host, region, service)) {
+    if (service == "iam") {
+      *region = cct->_conf->rgw_zonegroup;
+    } else {
+      *region = cct->_conf->rgw_zonegroup;
+      service = "s3";
+    }
+    return;
+  }
+}
+
+int RGWRESTSimpleRequest::forward_request(const DoutPrefixProvider *dpp, const RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y, std::string service)
 {
 
   string date_str;
@@ -251,20 +372,55 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
   RGWEnv new_env;
   req_info new_info(cct, &new_env);
   new_info.rebuild_from(info);
+  string bucket_encode;
+  string request_uri_encode;
+  size_t pos = new_info.request_uri.substr(1, new_info.request_uri.size() - 1).find("/");
+  string bucket = new_info.request_uri.substr(1, pos);
+  url_encode(bucket, bucket_encode);
+  if (std::string::npos != pos)
+    request_uri_encode = string("/") + bucket_encode + new_info.request_uri.substr(pos + 1);
+  else
+    request_uri_encode = string("/") + bucket_encode;
+  new_info.request_uri = request_uri_encode;
+
+  for (auto& param : params) {
+    new_info.args.append(param.first, param.second);
+  }
 
   new_env.set("HTTP_DATE", date_str.c_str());
+  const char* const content_md5 = info.env->get("HTTP_CONTENT_MD5");
+  if (content_md5) {
+    new_env.set("HTTP_CONTENT_MD5", content_md5);
+  }
+
+  string region;
+  string s;
+  if (!service.empty()) {
+    s = service;
+  }
+
+  scope_from_api_name(dpp, cct, host, api_name, &region, s);
+
+  const char *maybe_payload_hash = info.env->get("HTTP_X_AMZ_CONTENT_SHA256");
+  if (maybe_payload_hash && s != "iam") {
+    new_env.set("HTTP_X_AMZ_CONTENT_SHA256", maybe_payload_hash);
+  }
 
-  int ret = sign_request(key, new_env, new_info);
+  int ret = sign_request(dpp, key, region, s, new_env, new_info, nullptr);
   if (ret < 0) {
-    ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+    ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
     return ret;
   }
 
+  if (s == "iam") {
+    info.args.remove("PayloadHash");
+  }
+
   for (const auto& kv: new_env.get_map()) {
     headers.emplace_back(kv);
   }
 
-  map<string, string>& meta_map = new_info.x_meta_map;
+  meta_map_t& meta_map = new_info.x_meta_map;
   for (const auto& kv: meta_map) {
     headers.emplace_back(kv);
   }
@@ -292,7 +448,10 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
     set_send_length(inbl->length());
   }
 
-  int r = process(new_info.method, new_url.c_str());
+  method = new_info.method;
+  url = new_url;
+
+  int r = process(y);
   if (r < 0){
     if (r == -EINVAL){
       // curl_easy has errored, generally means the service is not available
@@ -304,16 +463,16 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
   response.append((char)0); /* NULL terminate response */
 
   if (outbl) {
-    outbl->claim(response);
+    *outbl = std::move(response);
   }
 
   return status;
 }
 
 class RGWRESTStreamOutCB : public RGWGetDataCB {
-  RGWRESTStreamWriteRequest *req;
+  RGWRESTStreamS3PutObj *req;
 public:
-  explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
+  explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
 };
 
@@ -321,34 +480,21 @@ int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
   if (!bl_ofs && bl_len == bl.length()) {
-    return req->add_output_data(bl);
+    req->add_send_data(bl);
+    return 0;
   }
 
   bufferptr bp(bl.c_str() + bl_ofs, bl_len);
   bufferlist new_bl;
   new_bl.push_back(bp);
 
-  return req->add_output_data(new_bl);
+  req->add_send_data(new_bl);
+  return 0;
 }
 
-RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
+RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
 {
-  delete cb;
-}
-
-int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
-{
-  lock.Lock();
-  if (status < 0) {
-    int ret = status;
-    lock.Unlock();
-    return ret;
-  }
-  pending_send.push_back(bl);
-  lock.Unlock();
-
-  bool done;
-  return http_manager.process_requests(false, &done);
+  delete out_cb;
 }
 
 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
@@ -408,7 +554,7 @@ static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm,
   }
 }
 
-static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
+static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
 {
   struct grant_type_to_header *t;
 
@@ -421,52 +567,110 @@ static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string
   }
 }
 
-int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
+RGWRESTGenerateHTTPHeaders::RGWRESTGenerateHTTPHeaders(CephContext *_cct, RGWEnv *_env, req_info *_info) :
+                                                DoutPrefix(_cct, dout_subsys, "rest gen http headers: "),
+                                                cct(_cct),
+                                                new_env(_env),
+                                                new_info(_info) {
+}
+
+void RGWRESTGenerateHTTPHeaders::init(const string& _method, const string& host,
+                                      const string& resource_prefix, const string& _url,
+                                      const string& resource, const param_vec_t& params,
+                                      std::optional<string> api_name)
 {
-  string resource = obj.bucket.name + "/" + obj.get_oid();
-  string new_url = url;
-  if (new_url[new_url.size() - 1] != '/')
-    new_url.append("/");
+  scope_from_api_name(this, cct, host, api_name, &region, service);
+
+  string params_str;
+  map<string, string>& args = new_info->args.get_params();
+  do_get_params_str(params, args, params_str);
+
+  /* merge params with extra args so that we can sign correctly */
+  for (auto iter = params.begin(); iter != params.end(); ++iter) {
+    new_info->args.append(iter->first, iter->second);
+  }
+
+  url = _url + resource + params_str;
 
   string date_str;
-  get_new_date_str(date_str);
+  get_gmt_date_str(date_str);
 
-  RGWEnv new_env;
-  req_info new_info(cct, &new_env);
-  
-  string params_str;
-  map<string, string>& args = new_info.args.get_params();
-  get_params_str(args, params_str);
+  new_env->set("HTTP_DATE", date_str.c_str());
+  new_env->set("HTTP_HOST", host);
 
-  new_url.append(resource + params_str);
+  method = _method;
+  new_info->method = method.c_str();
+  new_info->host = host;
 
-  new_env.set("HTTP_DATE", date_str.c_str());
+  new_info->script_uri = "/";
+  new_info->script_uri.append(resource_prefix);
+  new_info->script_uri.append(resource);
+  new_info->request_uri = new_info->script_uri;
+}
 
-  new_info.method = "PUT";
+static bool is_x_amz(const string& s) {
+  return boost::algorithm::starts_with(s, "x-amz-");
+}
 
-  new_info.script_uri = "/";
-  new_info.script_uri.append(resource);
-  new_info.request_uri = new_info.script_uri;
+void RGWRESTGenerateHTTPHeaders::set_extra_headers(const map<string, string>& extra_headers)
+{
+  for (auto iter : extra_headers) {
+    const string& name = lowercase_dash_http_attr(iter.first);
+    new_env->set(name, iter.second.c_str());
+    if (is_x_amz(name)) {
+      new_info->x_meta_map[name] = iter.second;
+    }
+  }
+}
+
+int RGWRESTGenerateHTTPHeaders::set_obj_attrs(const DoutPrefixProvider *dpp, map<string, bufferlist>& rgw_attrs)
+{
+  map<string, string> new_attrs;
 
   /* merge send headers */
-  for (auto& attr: attrs) {
+  for (auto& attr: rgw_attrs) {
     bufferlist& bl = attr.second;
     const string& name = attr.first;
     string val = bl.c_str();
     if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
       string header_name = RGW_AMZ_META_PREFIX;
       header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
-      new_env.set(header_name, val);
-      new_info.x_meta_map[header_name] = val;
+      new_attrs[header_name] = val;
     }
   }
+
   RGWAccessControlPolicy policy;
-  int ret = rgw_policy_from_attrset(cct, attrs, &policy);
+  int ret = rgw_policy_from_attrset(dpp, cct, rgw_attrs, &policy);
   if (ret < 0) {
-    ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
+    ldpp_dout(dpp, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
     return ret;
   }
 
+  set_http_attrs(new_attrs);
+  set_policy(policy);
+
+  return 0;
+}
+
+void RGWRESTGenerateHTTPHeaders::set_http_attrs(const map<string, string>& http_attrs)
+{
+  /* merge send headers */
+  for (auto& attr: http_attrs) {
+    const string& val = attr.second;
+    const string& name = lowercase_dash_http_attr(attr.first);
+    if (is_x_amz(name)) {
+      new_env->set(name, val);
+      new_info->x_meta_map[name] = val;
+    } else {
+      new_env->set(attr.first, val); /* Ugh, using the uppercase representation,
+                                       as the signing function calls info.env.get("CONTENT_TYPE").
+                                       This needs to be cleaned up! */
+    }
+  }
+}
+
+void RGWRESTGenerateHTTPHeaders::set_policy(RGWAccessControlPolicy& policy)
+{
   /* update acl headers */
   RGWAccessControlList& acl = policy.get_acl();
   multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
@@ -477,73 +681,82 @@ int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uin
     ACLPermission& perm = grant.get_permission();
     grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
   }
-  add_grants_headers(grants_by_type, new_env, new_info.x_meta_map);
-  ret = sign_request(key, new_env, new_info);
+  add_grants_headers(grants_by_type, *new_env, new_info->x_meta_map);
+}
+
+int RGWRESTGenerateHTTPHeaders::sign(const DoutPrefixProvider *dpp, RGWAccessKey& key, const bufferlist *opt_content)
+{
+  int ret = sign_request(dpp, key, region, service, *new_env, *new_info, opt_content);
   if (ret < 0) {
-    ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+    ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
     return ret;
   }
 
-  for (const auto& kv: new_env.get_map()) {
-    headers.emplace_back(kv);
+  return 0;
+}
+
+void RGWRESTStreamS3PutObj::send_init(rgw::sal::Object* obj)
+{
+  string resource_str;
+  string resource;
+  string new_url = url;
+  string new_host = host;
+
+   const auto& bucket_name = obj->get_bucket()->get_name();
+
+  if (host_style == VirtualStyle) {
+    resource_str = obj->get_oid();
+
+    new_url = bucket_name + "."  + new_url;
+    new_host = bucket_name + "." + new_host;
+  } else {
+    resource_str = bucket_name + "/" + obj->get_oid();
   }
 
-  cb = new RGWRESTStreamOutCB(this);
+  //do not encode slash in object key name
+  url_encode(resource_str, resource, false);
 
-  set_send_length(obj_size);
+  if (new_url[new_url.size() - 1] != '/')
+    new_url.append("/");
 
-  int r = http_manager.add_request(this, new_info.method, new_url.c_str());
-  if (r < 0)
-    return r;
+  method = "PUT";
+  headers_gen.init(method, new_host, resource_prefix, new_url, resource, params, api_name);
 
-  return 0;
+  url = headers_gen.get_url();
 }
 
-int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
+void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, bufferlist>& rgw_attrs)
 {
-  uint64_t sent = 0;
-
-  dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
-  lock.Lock();
-  if (pending_send.empty() || status < 0) {
-    lock.Unlock();
-    return status;
-  }
+  headers_gen.set_obj_attrs(dpp, rgw_attrs);
 
-  list<bufferlist>::iterator iter = pending_send.begin();
-  while (iter != pending_send.end() && len > 0) {
-    bufferlist& bl = *iter;
-    
-    list<bufferlist>::iterator next_iter = iter;
-    ++next_iter;
-    lock.Unlock();
-
-    uint64_t send_len = min(len, (size_t)bl.length());
+  send_ready(dpp, key);
+}
 
-    memcpy(ptr, bl.c_str(), send_len);
+void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, const map<string, string>& http_attrs,
+                                       RGWAccessControlPolicy& policy)
+{
+  headers_gen.set_http_attrs(http_attrs);
+  headers_gen.set_policy(policy);
 
-    ptr = (char *)ptr + send_len;
-    len -= send_len;
-    sent += send_len;
+  send_ready(dpp, key);
+}
 
-    lock.Lock();
+void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key)
+{
+  headers_gen.sign(dpp, key, nullptr);
 
-    bufferlist new_bl;
-    if (bl.length() > send_len) {
-      bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
-      new_bl.append(bp);
-    }
-    pending_send.pop_front(); /* need to do this after we copy data from bl */
-    if (new_bl.length()) {
-      pending_send.push_front(new_bl);
-    }
-    iter = next_iter;
+  for (const auto& kv: new_env.get_map()) {
+    headers.emplace_back(kv);
   }
-  lock.Unlock();
 
-  return sent;
+  out_cb = new RGWRESTStreamOutCB(this);
 }
 
+void RGWRESTStreamS3PutObj::put_obj_init(const DoutPrefixProvider *dpp, RGWAccessKey& key, rgw::sal::Object* obj, map<string, bufferlist>& attrs)
+{
+  send_init(obj);
+  send_ready(dpp, key, attrs);
+}
 
 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
 {
@@ -555,7 +768,7 @@ void set_str_from_headers(map<string, string>& out_headers, const string& header
   }
 }
 
-static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
+static int parse_rgwx_mtime(const DoutPrefixProvider *dpp, CephContext *cct, const string& s, ceph::real_time *rt)
 {
   string err;
   vector<string> vec;
@@ -569,14 +782,14 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *
   long secs = strict_strtol(vec[0].c_str(), 10, &err);
   long nsecs = 0;
   if (!err.empty()) {
-    ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
+    ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
     return -EINVAL;
   }
 
   if (vec.size() > 1) {
     nsecs = strict_strtol(vec[1].c_str(), 10, &err);
     if (!err.empty()) {
-      ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
+      ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
       return -EINVAL;
     }
   }
@@ -586,127 +799,167 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *
   return 0;
 }
 
-int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
+static void send_prepare_convert(const rgw_obj& obj, string *resource)
 {
-  int ret = http_manager.complete_requests();
-  if (ret < 0)
-    return ret;
+  string urlsafe_bucket, urlsafe_object;
+  url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
+  url_encode(obj.key.name, urlsafe_object);
+  *resource = urlsafe_bucket + "/" + urlsafe_object;
+}
 
-  set_str_from_headers(out_headers, "ETAG", etag);
+int RGWRESTStreamRWRequest::send_request(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
+{
+  string resource;
+  send_prepare_convert(obj, &resource);
 
-  if (mtime) {
-    string mtime_str;
-    set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
+  return send_request(dpp, &key, extra_headers, resource, mgr);
+}
 
-    ret = parse_rgwx_mtime(cct, mtime_str, mtime);
-    if (ret < 0) {
-      return ret;
-    }
-  }
-  return status;
+int RGWRESTStreamRWRequest::send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
+{
+  string resource;
+  send_prepare_convert(obj, &resource);
+
+  return do_send_prepare(dpp, &key, extra_headers, resource);
 }
 
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+                                           bufferlist *send_data)
 {
-  string urlsafe_bucket, urlsafe_object;
-  url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
-  url_encode(obj.key.name, urlsafe_object);
-  string resource = urlsafe_bucket + "/" + urlsafe_object;
+  string new_resource;
+  //do not encode slash
+  url_encode(resource, new_resource, false);
 
-  return send_request(&key, extra_headers, resource, nullptr, mgr);
+  return do_send_prepare(dpp, key, extra_headers, new_resource, send_data);
 }
 
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
-                                         bufferlist *send_data, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::do_send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+                                         bufferlist *send_data)
 {
   string new_url = url;
-  if (new_url[new_url.size() - 1] != '/')
+  if (!new_url.empty() && new_url.back() != '/')
     new_url.append("/");
-
-  string date_str;
-  get_new_date_str(date_str);
-
-  RGWEnv new_env;
-  req_info new_info(cct, &new_env);
   
-  string params_str;
-  map<string, string>& args = new_info.args.get_params();
-  get_params_str(args, params_str);
-
-  /* merge params with extra args so that we can sign correctly */
-  for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
-    new_info.args.append(iter->first, iter->second);
-  }
-
   string new_resource;
+  string bucket_name;
+  string old_resource = resource;
+
   if (resource[0] == '/') {
     new_resource = resource.substr(1);
   } else {
     new_resource = resource;
   }
 
-  new_url.append(new_resource + params_str);
+  size_t pos = new_resource.find("/");
+  bucket_name = new_resource.substr(0, pos);
 
-  new_env.set("HTTP_DATE", date_str.c_str());
+  //when dest is a bucket with out other params, uri should end up with '/'
+  if(pos == string::npos && params.size() == 0 && host_style == VirtualStyle) {
+    new_resource.append("/");
+  }
 
-  for (map<string, string>::iterator iter = extra_headers.begin();
-       iter != extra_headers.end(); ++iter) {
-    new_env.set(iter->first.c_str(), iter->second.c_str());
+  if (host_style == VirtualStyle) {
+    new_url = protocol + "://" + bucket_name + "." + host;
+    if(pos == string::npos) {
+      new_resource = "";
+    } else {
+      new_resource = new_resource.substr(pos+1);
+    }
   }
 
-  new_info.method = method;
+  headers_gen.emplace(cct, &new_env, &new_info);
 
-  new_info.script_uri = "/";
-  new_info.script_uri.append(new_resource);
-  new_info.request_uri = new_info.script_uri;
+  headers_gen->init(method, host, resource_prefix, new_url, new_resource, params, api_name);
 
-  new_info.init_meta_info(NULL);
+  headers_gen->set_http_attrs(extra_headers);
 
   if (key) {
-    int ret = sign_request(*key, new_env, new_info);
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
-      return ret;
-    }
+    sign_key = *key;
   }
 
-  for (const auto& kv: new_env.get_map()) {
-    headers.emplace_back(kv);
+  if (send_data) {
+    set_send_length(send_data->length());
+    set_outbl(*send_data);
+    set_send_data_hint(true);
   }
 
-  bool send_data_hint = false;
-  if (send_data) {
-    outbl.claim(*send_data);
-    send_data_hint = true;
+  method = new_info.method;
+  url = headers_gen->get_url();
+
+  return 0;
+}
+
+int RGWRESTStreamRWRequest::send_request(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+                                         RGWHTTPManager *mgr, bufferlist *send_data)
+{
+  int ret = send_prepare(dpp, key, extra_headers, resource, send_data);
+  if (ret < 0) {
+    return ret;
   }
 
-  RGWHTTPManager *pmanager = &http_manager;
-  if (mgr) {
-    pmanager = mgr;
+  return send(mgr);
+}
+
+
+int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
+{
+  if (!headers_gen) {
+    ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): send_prepare() was not called: likey a bug!" << dendl;
+    return -EINVAL;
   }
 
-  int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
-  if (r < 0)
-    return r;
+  const bufferlist *outblp{nullptr};
 
-  if (!mgr) {
-    r = pmanager->complete_requests();
-    if (r < 0)
+  if (send_len == outbl.length()) {
+    outblp = &outbl;
+  }
+
+  if (sign_key) {
+    int r = headers_gen->sign(this, *sign_key, outblp);
+    if (r < 0) {
+      ldpp_dout(this, 0) << "ERROR: failed to sign request" << dendl;
       return r;
+    }
+  }
+
+  for (const auto& kv: new_env.get_map()) {
+    headers.emplace_back(kv);
   }
 
+  if (!mgr) {
+    return RGWHTTP::send(this);
+  }
+
+  int r = mgr->add_request(this);
+  if (r < 0)
+    return r;
+
   return 0;
 }
 
-int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
+int RGWHTTPStreamRWRequest::complete_request(optional_yield y,
+                                             string *etag,
+                                             real_time *mtime,
+                                             uint64_t *psize,
+                                             map<string, string> *pattrs,
+                                             map<string, string> *pheaders)
 {
-  set_str_from_headers(out_headers, "ETAG", etag);
+  int ret = wait(y);
+  if (ret < 0) {
+    return ret;
+  }
+
+  unique_lock guard(out_headers_lock);
+
+  if (etag) {
+    set_str_from_headers(out_headers, "ETAG", *etag);
+  }
   if (status >= 0) {
     if (mtime) {
       string mtime_str;
       set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
       if (!mtime_str.empty()) {
-        int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
+        int ret = parse_rgwx_mtime(this, cct, mtime_str, mtime);
         if (ret < 0) {
           return ret;
         }
@@ -720,14 +973,13 @@ int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uin
       string err;
       *psize = strict_strtoll(size_str.c_str(), 10, &err);
       if (!err.empty()) {
-        ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
+        ldpp_dout(this, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
         return -EIO;
       }
     }
   }
 
-  map<string, string>::iterator iter;
-  for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
+  for (auto iter = out_headers.begin(); pattrs && iter != out_headers.end(); ++iter) {
     const string& attr_name = iter->first;
     if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
       string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
@@ -744,19 +996,23 @@ int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uin
         }
       }
       *dest = '\0';
-      attrs[buf] = iter->second;
+      (*pattrs)[buf] = iter->second;
     }
   }
+
+  if (pheaders) {
+    *pheaders = std::move(out_headers);
+  }
   return status;
 }
 
-int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
+int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val)
 {
   if (name == "RGWX_EMBEDDED_METADATA_LEN") {
     string err;
     long len = strict_strtol(val.c_str(), 10, &err);
     if (!err.empty()) {
-      ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
+      ldpp_dout(this, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
       return -EINVAL;
     }
 
@@ -765,39 +1021,99 @@ int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
   return 0;
 }
 
-int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
 {
-  bufferptr bp((const char *)ptr, len);
-  bufferlist bl;
-  bl.append(bp);
-  int ret = cb->handle_data(bl, ofs, len);
-  if (ret < 0)
-    return ret;
+  size_t orig_len = len;
+
+  if (cb) {
+    in_data.append((const char *)ptr, len);
+
+    size_t orig_in_data_len = in_data.length();
+
+    int ret = cb->handle_data(in_data, pause);
+    if (ret < 0)
+      return ret;
+    if (ret == 0) {
+      in_data.clear();
+    } else {
+      /* partial read */
+      ceph_assert(in_data.length() <= orig_in_data_len);
+      len = ret;
+      bufferlist bl;
+      size_t left_to_read = orig_in_data_len - len;
+      if (in_data.length() > left_to_read) {
+        in_data.splice(0, in_data.length() - left_to_read, &bl);
+      }
+    }
+  }
   ofs += len;
-  return len;
+  return orig_len;
+}
+
+void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
+  std::lock_guard wl{write_lock};
+  stream_writes = s;
 }
 
-int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
+void RGWHTTPStreamRWRequest::unpause_receive()
 {
-  if (outbl.length() == 0) {
-    return 0;
+  std::lock_guard req_locker{get_req_lock()};
+  if (!read_paused) {
+    _set_read_paused(false);
   }
+}
 
-  uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
-  if (send_size > 0) {
-    memcpy(ptr, outbl.c_str() + write_ofs, send_size);
-    write_ofs += send_size;
-  }
-  return send_size;
+void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
+{
+  std::scoped_lock locker{get_req_lock(), write_lock};
+  outbl.claim_append(bl);
+  _set_write_paused(false);
 }
 
-class StreamIntoBufferlist : public RGWGetDataCB {
-  bufferlist& bl;
-public:
-  StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
-  int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
-    bl.claim_append(inbl);
-    return bl_len;
-  }
-};
+uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
+{
+  std::lock_guard wl{write_lock};
+  return outbl.length();
+}
+
+void RGWHTTPStreamRWRequest::finish_write()
+{
+  std::scoped_lock locker{get_req_lock(), write_lock};
+  write_stream_complete = true;
+  _set_write_paused(false);
+}
+
+int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
+{
+  uint64_t out_len;
+  uint64_t send_size;
+  {
+    std::lock_guard wl{write_lock};
+
+    if (outbl.length() == 0) {
+      if ((stream_writes && !write_stream_complete) ||
+          (write_ofs < send_len)) {
+        *pause = true;
+      }
+      return 0;
+    }
+
+    len = std::min(len, (size_t)outbl.length());
+
+    bufferlist bl;
+    outbl.splice(0, len, &bl);
+    send_size = bl.length();
+    if (send_size > 0) {
+      memcpy(ptr, bl.c_str(), send_size);
+      write_ofs += send_size;
+    }
 
+    out_len = outbl.length();
+  }
+  /* don't need to be under write_lock here, avoid deadlocks in case notify callback
+   * needs to lock */
+  if (write_drain_cb) {
+    write_drain_cb->notify(out_len);
+  }
+  return send_size;
+}