// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
#include "rgw_common.h"
#include "rgw_rest_client.h"
#include "rgw_auth_s3.h"
#include "rgw_http_errors.h"
-#include "rgw_rados.h"
-#include "common/ceph_crypto_cms.h"
#include "common/armor.h"
#include "common/strtol.h"
#include "include/str_list.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-int RGWRESTSimpleRequest::get_status()
+using namespace std;
+
+int RGWHTTPSimpleRequest::get_status()
{
int retcode = get_req_retcode();
if (retcode < 0) {
return status;
}
-int RGWRESTSimpleRequest::handle_header(const string& name, const string& val)
+int RGWHTTPSimpleRequest::handle_header(const string& name, const string& val)
{
if (name == "CONTENT_LENGTH") {
string err;
long len = strict_strtol(val.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
return -EINVAL;
}
return 0;
}
-int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len)
{
+ unique_lock guard(out_headers_lock);
+
char line[len + 1];
char *s = (char *)ptr, *end = (char *)ptr + len;
char *p = line;
- ldout(cct, 10) << "receive_http_header" << dendl;
+ ldpp_dout(this, 30) << "receive_http_header" << dendl;
while (s != end) {
if (*s == '\r') {
}
if (*s == '\n') {
*p = '\0';
- ldout(cct, 10) << "received header:" << line << dendl;
+ ldpp_dout(this, 30) << "received header:" << line << dendl;
// TODO: fill whatever data required here
char *l = line;
char *tok = strsep(&l, " \t:");
date_str = rgw_to_asctime(ceph_clock_now());
}
-int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
+static void get_gmt_date_str(string& date_str)
{
- string new_url = url;
- string new_resource = resource;
+ auto now_time = ceph::real_clock::now();
+ time_t rawtime = ceph::real_clock::to_time_t(now_time);
- if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
- new_url = new_url.substr(0, new_url.size() - 1);
- } else if (resource[0] != '/') {
- new_resource = "/";
- new_resource.append(resource);
- }
- new_url.append(new_resource);
+ char buffer[80];
- string date_str;
- get_new_date_str(date_str);
- headers.push_back(pair<string, string>("HTTP_DATE", date_str));
-
- string canonical_header;
- map<string, string> meta_map;
- map<string, string> sub_resources;
- rgw_create_s3_canonical_header(method, NULL, NULL, date_str.c_str(),
- meta_map, new_url.c_str(), sub_resources,
- canonical_header);
-
- string digest;
- try {
- digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
- } catch (int ret) {
- return ret;
- }
-
- string auth_hdr = "AWS " + key.id + ":" + digest;
-
- ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
-
- headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
- int r = process(method, new_url.c_str());
- if (r < 0)
- return r;
-
- return status;
+ struct tm timeInfo;
+ gmtime_r(&rawtime, &timeInfo);
+ strftime(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S %z", &timeInfo);
+
+ date_str = buffer;
}
-int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len, bool* pause)
{
if (!send_iter)
return 0;
return len;
}
-int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
{
size_t cp_len, left_len;
response.append(p);
return 0;
-
}
-void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
+static void append_param(string& dest, const string& name, const string& val)
{
if (dest.empty()) {
dest.append("?");
}
}
-void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
+static void do_get_params_str(const param_vec_t& params, map<string, string>& extra_args, string& dest)
{
map<string, string>::iterator miter;
for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
append_param(dest, miter->first, miter->second);
}
- param_vec_t::iterator iter;
- for (iter = params.begin(); iter != params.end(); ++iter) {
+ for (auto iter = params.begin(); iter != params.end(); ++iter) {
append_param(dest, iter->first, iter->second);
}
}
-int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
+void RGWHTTPSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
+{
+ do_get_params_str(params, extra_args, dest);
+}
+
+void RGWHTTPSimpleRequest::get_out_headers(map<string, string> *pheaders)
+{
+ unique_lock guard(out_headers_lock);
+ pheaders->swap(out_headers);
+ out_headers.clear();
+}
+
+static int sign_request_v2(const DoutPrefixProvider *dpp, const RGWAccessKey& key,
+ const string& region, const string& service,
+ RGWEnv& env, req_info& info,
+ const bufferlist *opt_content)
{
/* don't sign if no key is provided */
if (key.key.empty()) {
return 0;
}
- if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+ auto cct = dpp->get_cct();
+
+ if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
for (const auto& i: env.get_map()) {
- ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "():> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
}
}
string canonical_header;
- if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
- ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
+ if (!rgw_create_s3_canonical_header(dpp, info, NULL, canonical_header, false)) {
+ ldpp_dout(dpp, 0) << "failed to create canonical s3 header" << dendl;
return -EINVAL;
}
- ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
+ ldpp_dout(dpp, 10) << "generated canonical header: " << canonical_header << dendl;
string digest;
try {
}
string auth_hdr = "AWS " + key.id + ":" + digest;
- ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
-
+ ldpp_dout(dpp, 15) << "generated auth header: " << auth_hdr << dendl;
+
env.set("AUTHORIZATION", auth_hdr);
return 0;
}
-int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
+static int sign_request_v4(const DoutPrefixProvider *dpp, const RGWAccessKey& key,
+ const string& region, const string& service,
+ RGWEnv& env, req_info& info,
+ const bufferlist *opt_content)
+{
+ /* don't sign if no key is provided */
+ if (key.key.empty()) {
+ return 0;
+ }
+
+ auto cct = dpp->get_cct();
+
+ if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
+ for (const auto& i: env.get_map()) {
+ ldpp_dout(dpp, 20) << __func__ << "():> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
+ }
+ }
+
+ rgw::auth::s3::AWSSignerV4::prepare_result_t sigv4_data;
+ if (service == "s3") {
+ sigv4_data = rgw::auth::s3::AWSSignerV4::prepare(dpp, key.id, region, service, info, opt_content, true);
+ } else {
+ sigv4_data = rgw::auth::s3::AWSSignerV4::prepare(dpp, key.id, region, service, info, opt_content, false);
+ }
+ auto sigv4_headers = sigv4_data.signature_factory(dpp, key.key, sigv4_data);
+
+ for (auto& entry : sigv4_headers) {
+ ldpp_dout(dpp, 20) << __func__ << "(): sigv4 header: " << entry.first << ": " << entry.second << dendl;
+ env.set(entry.first, entry.second);
+ }
+
+ return 0;
+}
+
+static int sign_request(const DoutPrefixProvider *dpp, const RGWAccessKey& key,
+ const string& region, const string& service,
+ RGWEnv& env, req_info& info,
+ const bufferlist *opt_content)
+{
+ auto authv = dpp->get_cct()->_conf.get_val<int64_t>("rgw_s3_client_max_sig_ver");
+ if (authv > 0 &&
+ authv <= 3) {
+ return sign_request_v2(dpp, key, region, service, env, info, opt_content);
+ }
+
+ return sign_request_v4(dpp, key, region, service, env, info, opt_content);
+}
+
+static string extract_region_name(string&& s)
+{
+ if (s == "s3") {
+ return "us-east-1";
+ }
+ if (boost::algorithm::starts_with(s, "s3-")) {
+ return s.substr(3);
+ }
+ return std::move(s);
+}
+
+
+static bool identify_scope(const DoutPrefixProvider *dpp,
+ CephContext *cct,
+ const string& host,
+ string *region,
+ string& service)
+{
+ if (!boost::algorithm::ends_with(host, "amazonaws.com")) {
+ ldpp_dout(dpp, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl;
+ return false;
+ }
+
+ vector<string> vec;
+
+ get_str_vec(host, ".", vec);
+
+ string ser = service;
+ if (service.empty()) {
+ service = "s3"; /* default */
+ }
+
+ for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
+ auto& s = *iter;
+ if (s == "s3" ||
+ s == "execute-api" ||
+ s == "iam") {
+ if (s == "execute-api") {
+ service = s;
+ }
+ ++iter;
+ if (iter == vec.end()) {
+ ldpp_dout(dpp, 0) << "WARNING: cannot identify region name from host name: " << host << dendl;
+ return false;
+ }
+ auto& next = *iter;
+ if (next == "amazonaws") {
+ *region = "us-east-1";
+ return true;
+ }
+ *region = next;
+ return true;
+ } else if (boost::algorithm::starts_with(s, "s3-")) {
+ *region = extract_region_name(std::move(s));
+ return true;
+ }
+ }
+
+ return false;
+}
+
+static void scope_from_api_name(const DoutPrefixProvider *dpp,
+ CephContext *cct,
+ const string& host,
+ std::optional<string> api_name,
+ string *region,
+ string& service)
+{
+ if (api_name && service.empty()) {
+ *region = *api_name;
+ service = "s3";
+ return;
+ }
+
+ if (!identify_scope(dpp, cct, host, region, service)) {
+ if (service == "iam") {
+ *region = cct->_conf->rgw_zonegroup;
+ } else {
+ *region = cct->_conf->rgw_zonegroup;
+ service = "s3";
+ }
+ return;
+ }
+}
+
+int RGWRESTSimpleRequest::forward_request(const DoutPrefixProvider *dpp, const RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y, std::string service)
{
string date_str;
RGWEnv new_env;
req_info new_info(cct, &new_env);
new_info.rebuild_from(info);
+ string bucket_encode;
+ string request_uri_encode;
+ size_t pos = new_info.request_uri.substr(1, new_info.request_uri.size() - 1).find("/");
+ string bucket = new_info.request_uri.substr(1, pos);
+ url_encode(bucket, bucket_encode);
+ if (std::string::npos != pos)
+ request_uri_encode = string("/") + bucket_encode + new_info.request_uri.substr(pos + 1);
+ else
+ request_uri_encode = string("/") + bucket_encode;
+ new_info.request_uri = request_uri_encode;
+
+ for (auto& param : params) {
+ new_info.args.append(param.first, param.second);
+ }
new_env.set("HTTP_DATE", date_str.c_str());
+ const char* const content_md5 = info.env->get("HTTP_CONTENT_MD5");
+ if (content_md5) {
+ new_env.set("HTTP_CONTENT_MD5", content_md5);
+ }
+
+ string region;
+ string s;
+ if (!service.empty()) {
+ s = service;
+ }
+
+ scope_from_api_name(dpp, cct, host, api_name, ®ion, s);
+
+ const char *maybe_payload_hash = info.env->get("HTTP_X_AMZ_CONTENT_SHA256");
+ if (maybe_payload_hash && s != "iam") {
+ new_env.set("HTTP_X_AMZ_CONTENT_SHA256", maybe_payload_hash);
+ }
- int ret = sign_request(key, new_env, new_info);
+ int ret = sign_request(dpp, key, region, s, new_env, new_info, nullptr);
if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
return ret;
}
+ if (s == "iam") {
+ info.args.remove("PayloadHash");
+ }
+
for (const auto& kv: new_env.get_map()) {
headers.emplace_back(kv);
}
- map<string, string>& meta_map = new_info.x_meta_map;
+ meta_map_t& meta_map = new_info.x_meta_map;
for (const auto& kv: meta_map) {
headers.emplace_back(kv);
}
set_send_length(inbl->length());
}
- int r = process(new_info.method, new_url.c_str());
+ method = new_info.method;
+ url = new_url;
+
+ int r = process(y);
if (r < 0){
if (r == -EINVAL){
// curl_easy has errored, generally means the service is not available
response.append((char)0); /* NULL terminate response */
if (outbl) {
- outbl->claim(response);
+ *outbl = std::move(response);
}
return status;
}
class RGWRESTStreamOutCB : public RGWGetDataCB {
- RGWRESTStreamWriteRequest *req;
+ RGWRESTStreamS3PutObj *req;
public:
- explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
+ explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
};
{
dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
if (!bl_ofs && bl_len == bl.length()) {
- return req->add_output_data(bl);
+ req->add_send_data(bl);
+ return 0;
}
bufferptr bp(bl.c_str() + bl_ofs, bl_len);
bufferlist new_bl;
new_bl.push_back(bp);
- return req->add_output_data(new_bl);
+ req->add_send_data(new_bl);
+ return 0;
}
-RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
+RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
{
- delete cb;
-}
-
-int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
-{
- lock.Lock();
- if (status < 0) {
- int ret = status;
- lock.Unlock();
- return ret;
- }
- pending_send.push_back(bl);
- lock.Unlock();
-
- bool done;
- return http_manager.process_requests(false, &done);
+ delete out_cb;
}
static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
{
- if ((perm & check_perm) == perm) {
+ if ((perm & check_perm) == check_perm) {
grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
return true;
}
}
}
-static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
+static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
{
struct grant_type_to_header *t;
}
}
-int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
+RGWRESTGenerateHTTPHeaders::RGWRESTGenerateHTTPHeaders(CephContext *_cct, RGWEnv *_env, req_info *_info) :
+ DoutPrefix(_cct, dout_subsys, "rest gen http headers: "),
+ cct(_cct),
+ new_env(_env),
+ new_info(_info) {
+}
+
+void RGWRESTGenerateHTTPHeaders::init(const string& _method, const string& host,
+ const string& resource_prefix, const string& _url,
+ const string& resource, const param_vec_t& params,
+ std::optional<string> api_name)
{
- string resource = obj.bucket.name + "/" + obj.get_oid();
- string new_url = url;
- if (new_url[new_url.size() - 1] != '/')
- new_url.append("/");
+ scope_from_api_name(this, cct, host, api_name, ®ion, service);
+
+ string params_str;
+ map<string, string>& args = new_info->args.get_params();
+ do_get_params_str(params, args, params_str);
+
+ /* merge params with extra args so that we can sign correctly */
+ for (auto iter = params.begin(); iter != params.end(); ++iter) {
+ new_info->args.append(iter->first, iter->second);
+ }
+
+ url = _url + resource + params_str;
string date_str;
- get_new_date_str(date_str);
+ get_gmt_date_str(date_str);
- RGWEnv new_env;
- req_info new_info(cct, &new_env);
-
- string params_str;
- map<string, string>& args = new_info.args.get_params();
- get_params_str(args, params_str);
+ new_env->set("HTTP_DATE", date_str.c_str());
+ new_env->set("HTTP_HOST", host);
- new_url.append(resource + params_str);
+ method = _method;
+ new_info->method = method.c_str();
+ new_info->host = host;
- new_env.set("HTTP_DATE", date_str.c_str());
+ new_info->script_uri = "/";
+ new_info->script_uri.append(resource_prefix);
+ new_info->script_uri.append(resource);
+ new_info->request_uri = new_info->script_uri;
+}
- new_info.method = "PUT";
+static bool is_x_amz(const string& s) {
+ return boost::algorithm::starts_with(s, "x-amz-");
+}
- new_info.script_uri = "/";
- new_info.script_uri.append(resource);
- new_info.request_uri = new_info.script_uri;
+void RGWRESTGenerateHTTPHeaders::set_extra_headers(const map<string, string>& extra_headers)
+{
+ for (auto iter : extra_headers) {
+ const string& name = lowercase_dash_http_attr(iter.first);
+ new_env->set(name, iter.second.c_str());
+ if (is_x_amz(name)) {
+ new_info->x_meta_map[name] = iter.second;
+ }
+ }
+}
+
+int RGWRESTGenerateHTTPHeaders::set_obj_attrs(const DoutPrefixProvider *dpp, map<string, bufferlist>& rgw_attrs)
+{
+ map<string, string> new_attrs;
/* merge send headers */
- for (auto& attr: attrs) {
+ for (auto& attr: rgw_attrs) {
bufferlist& bl = attr.second;
const string& name = attr.first;
string val = bl.c_str();
if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
string header_name = RGW_AMZ_META_PREFIX;
header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
- new_env.set(header_name, val);
- new_info.x_meta_map[header_name] = val;
+ new_attrs[header_name] = val;
}
}
+
RGWAccessControlPolicy policy;
- int ret = rgw_policy_from_attrset(cct, attrs, &policy);
+ int ret = rgw_policy_from_attrset(dpp, cct, rgw_attrs, &policy);
if (ret < 0) {
- ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
return ret;
}
+ set_http_attrs(new_attrs);
+ set_policy(policy);
+
+ return 0;
+}
+
+void RGWRESTGenerateHTTPHeaders::set_http_attrs(const map<string, string>& http_attrs)
+{
+ /* merge send headers */
+ for (auto& attr: http_attrs) {
+ const string& val = attr.second;
+ const string& name = lowercase_dash_http_attr(attr.first);
+ if (is_x_amz(name)) {
+ new_env->set(name, val);
+ new_info->x_meta_map[name] = val;
+ } else {
+ new_env->set(attr.first, val); /* Ugh, using the uppercase representation,
+ as the signing function calls info.env.get("CONTENT_TYPE").
+ This needs to be cleaned up! */
+ }
+ }
+}
+
+void RGWRESTGenerateHTTPHeaders::set_policy(RGWAccessControlPolicy& policy)
+{
/* update acl headers */
RGWAccessControlList& acl = policy.get_acl();
multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
ACLPermission& perm = grant.get_permission();
grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
}
- add_grants_headers(grants_by_type, new_env, new_info.x_meta_map);
- ret = sign_request(key, new_env, new_info);
+ add_grants_headers(grants_by_type, *new_env, new_info->x_meta_map);
+}
+
+int RGWRESTGenerateHTTPHeaders::sign(const DoutPrefixProvider *dpp, RGWAccessKey& key, const bufferlist *opt_content)
+{
+ int ret = sign_request(dpp, key, region, service, *new_env, *new_info, opt_content);
if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
return ret;
}
- for (const auto& kv: new_env.get_map()) {
- headers.emplace_back(kv);
+ return 0;
+}
+
+void RGWRESTStreamS3PutObj::send_init(rgw::sal::Object* obj)
+{
+ string resource_str;
+ string resource;
+ string new_url = url;
+ string new_host = host;
+
+ const auto& bucket_name = obj->get_bucket()->get_name();
+
+ if (host_style == VirtualStyle) {
+ resource_str = obj->get_oid();
+
+ new_url = bucket_name + "." + new_url;
+ new_host = bucket_name + "." + new_host;
+ } else {
+ resource_str = bucket_name + "/" + obj->get_oid();
}
- cb = new RGWRESTStreamOutCB(this);
+ //do not encode slash in object key name
+ url_encode(resource_str, resource, false);
- set_send_length(obj_size);
+ if (new_url[new_url.size() - 1] != '/')
+ new_url.append("/");
- int r = http_manager.add_request(this, new_info.method, new_url.c_str());
- if (r < 0)
- return r;
+ method = "PUT";
+ headers_gen.init(method, new_host, resource_prefix, new_url, resource, params, api_name);
- return 0;
+ url = headers_gen.get_url();
}
-int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
+void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, bufferlist>& rgw_attrs)
{
- uint64_t sent = 0;
-
- dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
- lock.Lock();
- if (pending_send.empty() || status < 0) {
- lock.Unlock();
- return status;
- }
+ headers_gen.set_obj_attrs(dpp, rgw_attrs);
- list<bufferlist>::iterator iter = pending_send.begin();
- while (iter != pending_send.end() && len > 0) {
- bufferlist& bl = *iter;
-
- list<bufferlist>::iterator next_iter = iter;
- ++next_iter;
- lock.Unlock();
-
- uint64_t send_len = min(len, (size_t)bl.length());
+ send_ready(dpp, key);
+}
- memcpy(ptr, bl.c_str(), send_len);
+void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, const map<string, string>& http_attrs,
+ RGWAccessControlPolicy& policy)
+{
+ headers_gen.set_http_attrs(http_attrs);
+ headers_gen.set_policy(policy);
- ptr = (char *)ptr + send_len;
- len -= send_len;
- sent += send_len;
+ send_ready(dpp, key);
+}
- lock.Lock();
+void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key)
+{
+ headers_gen.sign(dpp, key, nullptr);
- bufferlist new_bl;
- if (bl.length() > send_len) {
- bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
- new_bl.append(bp);
- }
- pending_send.pop_front(); /* need to do this after we copy data from bl */
- if (new_bl.length()) {
- pending_send.push_front(new_bl);
- }
- iter = next_iter;
+ for (const auto& kv: new_env.get_map()) {
+ headers.emplace_back(kv);
}
- lock.Unlock();
- return sent;
+ out_cb = new RGWRESTStreamOutCB(this);
}
+void RGWRESTStreamS3PutObj::put_obj_init(const DoutPrefixProvider *dpp, RGWAccessKey& key, rgw::sal::Object* obj, map<string, bufferlist>& attrs)
+{
+ send_init(obj);
+ send_ready(dpp, key, attrs);
+}
void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
{
}
}
-static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
+static int parse_rgwx_mtime(const DoutPrefixProvider *dpp, CephContext *cct, const string& s, ceph::real_time *rt)
{
string err;
vector<string> vec;
long secs = strict_strtol(vec[0].c_str(), 10, &err);
long nsecs = 0;
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
return -EINVAL;
}
if (vec.size() > 1) {
nsecs = strict_strtol(vec[1].c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
return -EINVAL;
}
}
return 0;
}
-int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
+static void send_prepare_convert(const rgw_obj& obj, string *resource)
{
- int ret = http_manager.complete_requests();
- if (ret < 0)
- return ret;
+ string urlsafe_bucket, urlsafe_object;
+ url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
+ url_encode(obj.key.name, urlsafe_object);
+ *resource = urlsafe_bucket + "/" + urlsafe_object;
+}
- set_str_from_headers(out_headers, "ETAG", etag);
+int RGWRESTStreamRWRequest::send_request(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
+{
+ string resource;
+ send_prepare_convert(obj, &resource);
- if (mtime) {
- string mtime_str;
- set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
+ return send_request(dpp, &key, extra_headers, resource, mgr);
+}
- ret = parse_rgwx_mtime(cct, mtime_str, mtime);
- if (ret < 0) {
- return ret;
- }
- }
- return status;
+int RGWRESTStreamRWRequest::send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
+{
+ string resource;
+ send_prepare_convert(obj, &resource);
+
+ return do_send_prepare(dpp, &key, extra_headers, resource);
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ bufferlist *send_data)
{
- string urlsafe_bucket, urlsafe_object;
- url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
- url_encode(obj.key.name, urlsafe_object);
- string resource = urlsafe_bucket + "/" + urlsafe_object;
+ string new_resource;
+ //do not encode slash
+ url_encode(resource, new_resource, false);
- return send_request(&key, extra_headers, resource, nullptr, mgr);
+ return do_send_prepare(dpp, key, extra_headers, new_resource, send_data);
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
- bufferlist *send_data, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::do_send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ bufferlist *send_data)
{
string new_url = url;
- if (new_url[new_url.size() - 1] != '/')
+ if (!new_url.empty() && new_url.back() != '/')
new_url.append("/");
-
- string date_str;
- get_new_date_str(date_str);
-
- RGWEnv new_env;
- req_info new_info(cct, &new_env);
- string params_str;
- map<string, string>& args = new_info.args.get_params();
- get_params_str(args, params_str);
-
- /* merge params with extra args so that we can sign correctly */
- for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
- new_info.args.append(iter->first, iter->second);
- }
-
string new_resource;
+ string bucket_name;
+ string old_resource = resource;
+
if (resource[0] == '/') {
new_resource = resource.substr(1);
} else {
new_resource = resource;
}
- new_url.append(new_resource + params_str);
+ size_t pos = new_resource.find("/");
+ bucket_name = new_resource.substr(0, pos);
- new_env.set("HTTP_DATE", date_str.c_str());
+ //when dest is a bucket with out other params, uri should end up with '/'
+ if(pos == string::npos && params.size() == 0 && host_style == VirtualStyle) {
+ new_resource.append("/");
+ }
- for (map<string, string>::iterator iter = extra_headers.begin();
- iter != extra_headers.end(); ++iter) {
- new_env.set(iter->first.c_str(), iter->second.c_str());
+ if (host_style == VirtualStyle) {
+ new_url = protocol + "://" + bucket_name + "." + host;
+ if(pos == string::npos) {
+ new_resource = "";
+ } else {
+ new_resource = new_resource.substr(pos+1);
+ }
}
- new_info.method = method;
+ headers_gen.emplace(cct, &new_env, &new_info);
- new_info.script_uri = "/";
- new_info.script_uri.append(new_resource);
- new_info.request_uri = new_info.script_uri;
+ headers_gen->init(method, host, resource_prefix, new_url, new_resource, params, api_name);
- new_info.init_meta_info(NULL);
+ headers_gen->set_http_attrs(extra_headers);
if (key) {
- int ret = sign_request(*key, new_env, new_info);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
- return ret;
- }
+ sign_key = *key;
}
- for (const auto& kv: new_env.get_map()) {
- headers.emplace_back(kv);
+ if (send_data) {
+ set_send_length(send_data->length());
+ set_outbl(*send_data);
+ set_send_data_hint(true);
}
- bool send_data_hint = false;
- if (send_data) {
- outbl.claim(*send_data);
- send_data_hint = true;
+ method = new_info.method;
+ url = headers_gen->get_url();
+
+ return 0;
+}
+
+int RGWRESTStreamRWRequest::send_request(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ RGWHTTPManager *mgr, bufferlist *send_data)
+{
+ int ret = send_prepare(dpp, key, extra_headers, resource, send_data);
+ if (ret < 0) {
+ return ret;
}
- RGWHTTPManager *pmanager = &http_manager;
- if (mgr) {
- pmanager = mgr;
+ return send(mgr);
+}
+
+
+int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
+{
+ if (!headers_gen) {
+ ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): send_prepare() was not called: likey a bug!" << dendl;
+ return -EINVAL;
}
- int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
- if (r < 0)
- return r;
+ const bufferlist *outblp{nullptr};
- if (!mgr) {
- r = pmanager->complete_requests();
- if (r < 0)
+ if (send_len == outbl.length()) {
+ outblp = &outbl;
+ }
+
+ if (sign_key) {
+ int r = headers_gen->sign(this, *sign_key, outblp);
+ if (r < 0) {
+ ldpp_dout(this, 0) << "ERROR: failed to sign request" << dendl;
return r;
+ }
+ }
+
+ for (const auto& kv: new_env.get_map()) {
+ headers.emplace_back(kv);
}
+ if (!mgr) {
+ return RGWHTTP::send(this);
+ }
+
+ int r = mgr->add_request(this);
+ if (r < 0)
+ return r;
+
return 0;
}
-int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
+int RGWHTTPStreamRWRequest::complete_request(optional_yield y,
+ string *etag,
+ real_time *mtime,
+ uint64_t *psize,
+ map<string, string> *pattrs,
+ map<string, string> *pheaders)
{
- set_str_from_headers(out_headers, "ETAG", etag);
+ int ret = wait(y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ unique_lock guard(out_headers_lock);
+
+ if (etag) {
+ set_str_from_headers(out_headers, "ETAG", *etag);
+ }
if (status >= 0) {
if (mtime) {
string mtime_str;
set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
if (!mtime_str.empty()) {
- int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
+ int ret = parse_rgwx_mtime(this, cct, mtime_str, mtime);
if (ret < 0) {
return ret;
}
string err;
*psize = strict_strtoll(size_str.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
return -EIO;
}
}
}
- map<string, string>::iterator iter;
- for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
+ for (auto iter = out_headers.begin(); pattrs && iter != out_headers.end(); ++iter) {
const string& attr_name = iter->first;
if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
}
}
*dest = '\0';
- attrs[buf] = iter->second;
+ (*pattrs)[buf] = iter->second;
}
}
+
+ if (pheaders) {
+ *pheaders = std::move(out_headers);
+ }
return status;
}
-int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
+int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val)
{
if (name == "RGWX_EMBEDDED_METADATA_LEN") {
string err;
long len = strict_strtol(val.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
return -EINVAL;
}
return 0;
}
-int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
{
- bufferptr bp((const char *)ptr, len);
- bufferlist bl;
- bl.append(bp);
- int ret = cb->handle_data(bl, ofs, len);
- if (ret < 0)
- return ret;
+ size_t orig_len = len;
+
+ if (cb) {
+ in_data.append((const char *)ptr, len);
+
+ size_t orig_in_data_len = in_data.length();
+
+ int ret = cb->handle_data(in_data, pause);
+ if (ret < 0)
+ return ret;
+ if (ret == 0) {
+ in_data.clear();
+ } else {
+ /* partial read */
+ ceph_assert(in_data.length() <= orig_in_data_len);
+ len = ret;
+ bufferlist bl;
+ size_t left_to_read = orig_in_data_len - len;
+ if (in_data.length() > left_to_read) {
+ in_data.splice(0, in_data.length() - left_to_read, &bl);
+ }
+ }
+ }
ofs += len;
- return len;
+ return orig_len;
+}
+
+void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
+ std::lock_guard wl{write_lock};
+ stream_writes = s;
}
-int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
+void RGWHTTPStreamRWRequest::unpause_receive()
{
- if (outbl.length() == 0) {
- return 0;
+ std::lock_guard req_locker{get_req_lock()};
+ if (!read_paused) {
+ _set_read_paused(false);
}
+}
- uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
- if (send_size > 0) {
- memcpy(ptr, outbl.c_str() + write_ofs, send_size);
- write_ofs += send_size;
- }
- return send_size;
+void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
+{
+ std::scoped_lock locker{get_req_lock(), write_lock};
+ outbl.claim_append(bl);
+ _set_write_paused(false);
}
-class StreamIntoBufferlist : public RGWGetDataCB {
- bufferlist& bl;
-public:
- StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
- int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
- bl.claim_append(inbl);
- return bl_len;
- }
-};
+uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
+{
+ std::lock_guard wl{write_lock};
+ return outbl.length();
+}
+
+void RGWHTTPStreamRWRequest::finish_write()
+{
+ std::scoped_lock locker{get_req_lock(), write_lock};
+ write_stream_complete = true;
+ _set_write_paused(false);
+}
+
+int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
+{
+ uint64_t out_len;
+ uint64_t send_size;
+ {
+ std::lock_guard wl{write_lock};
+
+ if (outbl.length() == 0) {
+ if ((stream_writes && !write_stream_complete) ||
+ (write_ofs < send_len)) {
+ *pause = true;
+ }
+ return 0;
+ }
+
+ len = std::min(len, (size_t)outbl.length());
+
+ bufferlist bl;
+ outbl.splice(0, len, &bl);
+ send_size = bl.length();
+ if (send_size > 0) {
+ memcpy(ptr, bl.c_str(), send_size);
+ write_ofs += send_size;
+ }
+ out_len = outbl.length();
+ }
+ /* don't need to be under write_lock here, avoid deadlocks in case notify callback
+ * needs to lock */
+ if (write_drain_cb) {
+ write_drain_cb->notify(out_len);
+ }
+ return send_size;
+}