]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_client.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_common.h"
5 #include "rgw_rest_client.h"
6 #include "rgw_auth_s3.h"
7 #include "rgw_http_errors.h"
8
9 #include "common/armor.h"
10 #include "common/strtol.h"
11 #include "include/str_list.h"
12 #include "rgw_crypt_sanitize.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rgw
16
17 using namespace std;
18
19 int RGWHTTPSimpleRequest::get_status()
20 {
21 int retcode = get_req_retcode();
22 if (retcode < 0) {
23 return retcode;
24 }
25 return status;
26 }
27
28 int RGWHTTPSimpleRequest::handle_header(const string& name, const string& val)
29 {
30 if (name == "CONTENT_LENGTH") {
31 string err;
32 long len = strict_strtol(val.c_str(), 10, &err);
33 if (!err.empty()) {
34 ldpp_dout(this, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
35 return -EINVAL;
36 }
37
38 max_response = len;
39 }
40
41 return 0;
42 }
43
44 int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len)
45 {
46 unique_lock guard(out_headers_lock);
47
48 char line[len + 1];
49
50 char *s = (char *)ptr, *end = (char *)ptr + len;
51 char *p = line;
52 ldpp_dout(this, 30) << "receive_http_header" << dendl;
53
54 while (s != end) {
55 if (*s == '\r') {
56 s++;
57 continue;
58 }
59 if (*s == '\n') {
60 *p = '\0';
61 ldpp_dout(this, 30) << "received header:" << line << dendl;
62 // TODO: fill whatever data required here
63 char *l = line;
64 char *tok = strsep(&l, " \t:");
65 if (tok && l) {
66 while (*l == ' ')
67 l++;
68
69 if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
70 http_status = atoi(l);
71 if (http_status == 100) /* 100-continue response */
72 continue;
73 status = rgw_http_error_to_errno(http_status);
74 } else {
75 /* convert header field name to upper case */
76 char *src = tok;
77 char buf[len + 1];
78 size_t i;
79 for (i = 0; i < len && *src; ++i, ++src) {
80 switch (*src) {
81 case '-':
82 buf[i] = '_';
83 break;
84 default:
85 buf[i] = toupper(*src);
86 }
87 }
88 buf[i] = '\0';
89 out_headers[buf] = l;
90 int r = handle_header(buf, l);
91 if (r < 0)
92 return r;
93 }
94 }
95 }
96 if (s != end)
97 *p++ = *s++;
98 }
99 return 0;
100 }
101
102 static void get_new_date_str(string& date_str)
103 {
104 date_str = rgw_to_asctime(ceph_clock_now());
105 }
106
107 static void get_gmt_date_str(string& date_str)
108 {
109 auto now_time = ceph::real_clock::now();
110 time_t rawtime = ceph::real_clock::to_time_t(now_time);
111
112 char buffer[80];
113
114 struct tm timeInfo;
115 gmtime_r(&rawtime, &timeInfo);
116 strftime(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S %z", &timeInfo);
117
118 date_str = buffer;
119 }
120
121 int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len, bool* pause)
122 {
123 if (!send_iter)
124 return 0;
125
126 if (len > send_iter->get_remaining())
127 len = send_iter->get_remaining();
128
129 send_iter->copy(len, (char *)ptr);
130
131 return len;
132 }
133
134 int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
135 {
136 size_t cp_len, left_len;
137
138 left_len = max_response > response.length() ? (max_response - response.length()) : 0;
139 if (left_len == 0)
140 return 0; /* don't read extra data */
141
142 cp_len = (len > left_len) ? left_len : len;
143 bufferptr p((char *)ptr, cp_len);
144
145 response.append(p);
146
147 return 0;
148 }
149
150 static void append_param(string& dest, const string& name, const string& val)
151 {
152 if (dest.empty()) {
153 dest.append("?");
154 } else {
155 dest.append("&");
156 }
157 string url_name;
158 url_encode(name, url_name);
159 dest.append(url_name);
160
161 if (!val.empty()) {
162 string url_val;
163 url_encode(val, url_val);
164 dest.append("=");
165 dest.append(url_val);
166 }
167 }
168
169 static void do_get_params_str(const param_vec_t& params, map<string, string>& extra_args, string& dest)
170 {
171 map<string, string>::iterator miter;
172 for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
173 append_param(dest, miter->first, miter->second);
174 }
175 for (auto iter = params.begin(); iter != params.end(); ++iter) {
176 append_param(dest, iter->first, iter->second);
177 }
178 }
179
180 void RGWHTTPSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
181 {
182 do_get_params_str(params, extra_args, dest);
183 }
184
185 void RGWHTTPSimpleRequest::get_out_headers(map<string, string> *pheaders)
186 {
187 unique_lock guard(out_headers_lock);
188 pheaders->swap(out_headers);
189 out_headers.clear();
190 }
191
192 static int sign_request_v2(const DoutPrefixProvider *dpp, RGWAccessKey& key,
193 const string& region, const string& service,
194 RGWEnv& env, req_info& info,
195 const bufferlist *opt_content)
196 {
197 /* don't sign if no key is provided */
198 if (key.key.empty()) {
199 return 0;
200 }
201
202 auto cct = dpp->get_cct();
203
204 if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
205 for (const auto& i: env.get_map()) {
206 ldpp_dout(dpp, 20) << __func__ << "():> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
207 }
208 }
209
210 string canonical_header;
211 if (!rgw_create_s3_canonical_header(dpp, info, NULL, canonical_header, false)) {
212 ldpp_dout(dpp, 0) << "failed to create canonical s3 header" << dendl;
213 return -EINVAL;
214 }
215
216 ldpp_dout(dpp, 10) << "generated canonical header: " << canonical_header << dendl;
217
218 string digest;
219 try {
220 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
221 } catch (int ret) {
222 return ret;
223 }
224
225 string auth_hdr = "AWS " + key.id + ":" + digest;
226 ldpp_dout(dpp, 15) << "generated auth header: " << auth_hdr << dendl;
227
228 env.set("AUTHORIZATION", auth_hdr);
229
230 return 0;
231 }
232
233 static int sign_request_v4(const DoutPrefixProvider *dpp, RGWAccessKey& key,
234 const string& region, const string& service,
235 RGWEnv& env, req_info& info,
236 const bufferlist *opt_content)
237 {
238 /* don't sign if no key is provided */
239 if (key.key.empty()) {
240 return 0;
241 }
242
243 auto cct = dpp->get_cct();
244
245 if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
246 for (const auto& i: env.get_map()) {
247 ldpp_dout(dpp, 20) << __func__ << "():> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
248 }
249 }
250
251 auto sigv4_data = rgw::auth::s3::AWSSignerV4::prepare(dpp, key.id, region, service, info, opt_content, true);
252 auto sigv4_headers = sigv4_data.signature_factory(dpp, key.key, sigv4_data);
253
254 for (auto& entry : sigv4_headers) {
255 ldpp_dout(dpp, 20) << __func__ << "(): sigv4 header: " << entry.first << ": " << entry.second << dendl;
256 env.set(entry.first, entry.second);
257 }
258
259 return 0;
260 }
261
262 static int sign_request(const DoutPrefixProvider *dpp, RGWAccessKey& key,
263 const string& region, const string& service,
264 RGWEnv& env, req_info& info,
265 const bufferlist *opt_content)
266 {
267 auto authv = dpp->get_cct()->_conf.get_val<int64_t>("rgw_s3_client_max_sig_ver");
268 if (authv > 0 &&
269 authv <= 3) {
270 return sign_request_v2(dpp, key, region, service, env, info, opt_content);
271 }
272
273 return sign_request_v4(dpp, key, region, service, env, info, opt_content);
274 }
275
276 static string extract_region_name(string&& s)
277 {
278 if (s == "s3") {
279 return "us-east-1";
280 }
281 if (boost::algorithm::starts_with(s, "s3-")) {
282 return s.substr(3);
283 }
284 return std::move(s);
285 }
286
287
288 static bool identify_scope(const DoutPrefixProvider *dpp,
289 CephContext *cct,
290 const string& host,
291 string *region,
292 string *service)
293 {
294 if (!boost::algorithm::ends_with(host, "amazonaws.com")) {
295 ldpp_dout(dpp, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl;
296 return false;
297 }
298
299 vector<string> vec;
300
301 get_str_vec(host, ".", vec);
302
303 *service = "s3"; /* default */
304
305 for (auto iter = vec.begin(); iter != vec.end(); ++iter) {
306 auto& s = *iter;
307 if (s == "s3" ||
308 s == "execute-api") {
309 if (s == "execute-api") {
310 *service = s;
311 }
312 ++iter;
313 if (iter == vec.end()) {
314 ldpp_dout(dpp, 0) << "WARNING: cannot identify region name from host name: " << host << dendl;
315 return false;
316 }
317 auto& next = *iter;
318 if (next == "amazonaws") {
319 *region = "us-east-1";
320 return true;
321 }
322 *region = next;
323 return true;
324 } else if (boost::algorithm::starts_with(s, "s3-")) {
325 *region = extract_region_name(std::move(s));
326 return true;
327 }
328 }
329
330 return false;
331 }
332
333 static void scope_from_api_name(const DoutPrefixProvider *dpp,
334 CephContext *cct,
335 const string& host,
336 std::optional<string> api_name,
337 string *region,
338 string *service)
339 {
340 if (api_name) {
341 *region = *api_name;
342 *service = "s3";
343 return;
344 }
345
346 if (!identify_scope(dpp, cct, host, region, service)) {
347 *region = cct->_conf->rgw_zonegroup;
348 *service = "s3";
349 return;
350 }
351 }
352
353 int RGWRESTSimpleRequest::forward_request(const DoutPrefixProvider *dpp, RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y)
354 {
355
356 string date_str;
357 get_new_date_str(date_str);
358
359 RGWEnv new_env;
360 req_info new_info(cct, &new_env);
361 new_info.rebuild_from(info);
362 string bucket_encode;
363 string request_uri_encode;
364 size_t pos = new_info.request_uri.substr(1, new_info.request_uri.size() - 1).find("/");
365 string bucket = new_info.request_uri.substr(1, pos);
366 url_encode(bucket, bucket_encode);
367 if (std::string::npos != pos)
368 request_uri_encode = string("/") + bucket_encode + new_info.request_uri.substr(pos + 1);
369 else
370 request_uri_encode = string("/") + bucket_encode;
371 new_info.request_uri = request_uri_encode;
372
373 for (auto& param : params) {
374 new_info.args.append(param.first, param.second);
375 }
376
377 new_env.set("HTTP_DATE", date_str.c_str());
378 const char* const content_md5 = info.env->get("HTTP_CONTENT_MD5");
379 if (content_md5) {
380 new_env.set("HTTP_CONTENT_MD5", content_md5);
381 }
382
383 string region;
384 string service;
385
386 scope_from_api_name(dpp, cct, host, api_name, &region, &service);
387
388 const char *maybe_payload_hash = info.env->get("HTTP_X_AMZ_CONTENT_SHA256");
389 if (maybe_payload_hash) {
390 new_env.set("HTTP_X_AMZ_CONTENT_SHA256", maybe_payload_hash);
391 }
392
393 int ret = sign_request(dpp, key, region, service, new_env, new_info, nullptr);
394 if (ret < 0) {
395 ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
396 return ret;
397 }
398
399 for (const auto& kv: new_env.get_map()) {
400 headers.emplace_back(kv);
401 }
402
403 meta_map_t& meta_map = new_info.x_meta_map;
404 for (const auto& kv: meta_map) {
405 headers.emplace_back(kv);
406 }
407
408 string params_str;
409 get_params_str(info.args.get_params(), params_str);
410
411 string new_url = url;
412 string& resource = new_info.request_uri;
413 string new_resource = resource;
414 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
415 new_url = new_url.substr(0, new_url.size() - 1);
416 } else if (resource[0] != '/') {
417 new_resource = "/";
418 new_resource.append(resource);
419 }
420 new_url.append(new_resource + params_str);
421
422 bufferlist::iterator bliter;
423
424 if (inbl) {
425 bliter = inbl->begin();
426 send_iter = &bliter;
427
428 set_send_length(inbl->length());
429 }
430
431 method = new_info.method;
432 url = new_url;
433
434 int r = process(y);
435 if (r < 0){
436 if (r == -EINVAL){
437 // curl_easy has errored, generally means the service is not available
438 r = -ERR_SERVICE_UNAVAILABLE;
439 }
440 return r;
441 }
442
443 response.append((char)0); /* NULL terminate response */
444
445 if (outbl) {
446 *outbl = std::move(response);
447 }
448
449 return status;
450 }
451
452 class RGWRESTStreamOutCB : public RGWGetDataCB {
453 RGWRESTStreamS3PutObj *req;
454 public:
455 explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
456 int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
457 };
458
459 int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
460 {
461 dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
462 if (!bl_ofs && bl_len == bl.length()) {
463 req->add_send_data(bl);
464 return 0;
465 }
466
467 bufferptr bp(bl.c_str() + bl_ofs, bl_len);
468 bufferlist new_bl;
469 new_bl.push_back(bp);
470
471 req->add_send_data(new_bl);
472 return 0;
473 }
474
475 RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
476 {
477 delete out_cb;
478 }
479
480 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
481 {
482 string& s = grants_by_type[perm];
483
484 if (!s.empty())
485 s.append(", ");
486
487 string id_type_str;
488 ACLGranteeType& type = grant.get_type();
489 switch (type.get_type()) {
490 case ACL_TYPE_GROUP:
491 id_type_str = "uri";
492 break;
493 case ACL_TYPE_EMAIL_USER:
494 id_type_str = "emailAddress";
495 break;
496 default:
497 id_type_str = "id";
498 }
499 rgw_user id;
500 grant.get_id(id);
501 s.append(id_type_str + "=\"" + id.to_str() + "\"");
502 }
503
504 struct grant_type_to_header {
505 int type;
506 const char *header;
507 };
508
509 struct grant_type_to_header grants_headers_def[] = {
510 { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
511 { RGW_PERM_READ, "x-amz-grant-read"},
512 { RGW_PERM_WRITE, "x-amz-grant-write"},
513 { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"},
514 { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"},
515 { 0, NULL}
516 };
517
518 static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
519 {
520 if ((perm & check_perm) == check_perm) {
521 grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
522 return true;
523 }
524 return false;
525 }
526
527 static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
528 {
529 struct grant_type_to_header *t;
530
531 for (t = grants_headers_def; t->header; t++) {
532 if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
533 return;
534 }
535 }
536
537 static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
538 {
539 struct grant_type_to_header *t;
540
541 for (t = grants_headers_def; t->header; t++) {
542 map<int, string>::iterator iter = grants.find(t->type);
543 if (iter != grants.end()) {
544 env.set(t->header,iter->second);
545 meta_map[t->header] = iter->second;
546 }
547 }
548 }
549
550 RGWRESTGenerateHTTPHeaders::RGWRESTGenerateHTTPHeaders(CephContext *_cct, RGWEnv *_env, req_info *_info) :
551 DoutPrefix(_cct, dout_subsys, "rest gen http headers: "),
552 cct(_cct),
553 new_env(_env),
554 new_info(_info) {
555 }
556
557 void RGWRESTGenerateHTTPHeaders::init(const string& _method, const string& host,
558 const string& resource_prefix, const string& _url,
559 const string& resource, const param_vec_t& params,
560 std::optional<string> api_name)
561 {
562 scope_from_api_name(this, cct, host, api_name, &region, &service);
563
564 string params_str;
565 map<string, string>& args = new_info->args.get_params();
566 do_get_params_str(params, args, params_str);
567
568 /* merge params with extra args so that we can sign correctly */
569 for (auto iter = params.begin(); iter != params.end(); ++iter) {
570 new_info->args.append(iter->first, iter->second);
571 }
572
573 url = _url + resource + params_str;
574
575 string date_str;
576 get_gmt_date_str(date_str);
577
578 new_env->set("HTTP_DATE", date_str.c_str());
579 new_env->set("HTTP_HOST", host);
580
581 method = _method;
582 new_info->method = method.c_str();
583 new_info->host = host;
584
585 new_info->script_uri = "/";
586 new_info->script_uri.append(resource_prefix);
587 new_info->script_uri.append(resource);
588 new_info->request_uri = new_info->script_uri;
589 }
590
591 static bool is_x_amz(const string& s) {
592 return boost::algorithm::starts_with(s, "x-amz-");
593 }
594
595 void RGWRESTGenerateHTTPHeaders::set_extra_headers(const map<string, string>& extra_headers)
596 {
597 for (auto iter : extra_headers) {
598 const string& name = lowercase_dash_http_attr(iter.first);
599 new_env->set(name, iter.second.c_str());
600 if (is_x_amz(name)) {
601 new_info->x_meta_map[name] = iter.second;
602 }
603 }
604 }
605
606 int RGWRESTGenerateHTTPHeaders::set_obj_attrs(const DoutPrefixProvider *dpp, map<string, bufferlist>& rgw_attrs)
607 {
608 map<string, string> new_attrs;
609
610 /* merge send headers */
611 for (auto& attr: rgw_attrs) {
612 bufferlist& bl = attr.second;
613 const string& name = attr.first;
614 string val = bl.c_str();
615 if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
616 string header_name = RGW_AMZ_META_PREFIX;
617 header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
618 new_attrs[header_name] = val;
619 }
620 }
621
622 RGWAccessControlPolicy policy;
623 int ret = rgw_policy_from_attrset(dpp, cct, rgw_attrs, &policy);
624 if (ret < 0) {
625 ldpp_dout(dpp, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
626 return ret;
627 }
628
629 set_http_attrs(new_attrs);
630 set_policy(policy);
631
632 return 0;
633 }
634
635 void RGWRESTGenerateHTTPHeaders::set_http_attrs(const map<string, string>& http_attrs)
636 {
637 /* merge send headers */
638 for (auto& attr: http_attrs) {
639 const string& val = attr.second;
640 const string& name = lowercase_dash_http_attr(attr.first);
641 if (is_x_amz(name)) {
642 new_env->set(name, val);
643 new_info->x_meta_map[name] = val;
644 } else {
645 new_env->set(attr.first, val); /* Ugh, using the uppercase representation,
646 as the signing function calls info.env.get("CONTENT_TYPE").
647 This needs to be cleaned up! */
648 }
649 }
650 }
651
652 void RGWRESTGenerateHTTPHeaders::set_policy(RGWAccessControlPolicy& policy)
653 {
654 /* update acl headers */
655 RGWAccessControlList& acl = policy.get_acl();
656 multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
657 multimap<string, ACLGrant>::iterator giter;
658 map<int, string> grants_by_type;
659 for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
660 ACLGrant& grant = giter->second;
661 ACLPermission& perm = grant.get_permission();
662 grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
663 }
664 add_grants_headers(grants_by_type, *new_env, new_info->x_meta_map);
665 }
666
667 int RGWRESTGenerateHTTPHeaders::sign(const DoutPrefixProvider *dpp, RGWAccessKey& key, const bufferlist *opt_content)
668 {
669 int ret = sign_request(dpp, key, region, service, *new_env, *new_info, opt_content);
670 if (ret < 0) {
671 ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
672 return ret;
673 }
674
675 return 0;
676 }
677
678 void RGWRESTStreamS3PutObj::send_init(rgw::sal::Object* obj)
679 {
680 string resource_str;
681 string resource;
682 string new_url = url;
683 string new_host = host;
684
685 const auto& bucket_name = obj->get_bucket()->get_name();
686
687 if (host_style == VirtualStyle) {
688 resource_str = obj->get_oid();
689
690 new_url = bucket_name + "." + new_url;
691 new_host = bucket_name + "." + new_host;
692 } else {
693 resource_str = bucket_name + "/" + obj->get_oid();
694 }
695
696 //do not encode slash in object key name
697 url_encode(resource_str, resource, false);
698
699 if (new_url[new_url.size() - 1] != '/')
700 new_url.append("/");
701
702 method = "PUT";
703 headers_gen.init(method, new_host, resource_prefix, new_url, resource, params, api_name);
704
705 url = headers_gen.get_url();
706 }
707
708 void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, bufferlist>& rgw_attrs)
709 {
710 headers_gen.set_obj_attrs(dpp, rgw_attrs);
711
712 send_ready(dpp, key);
713 }
714
715 void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key, const map<string, string>& http_attrs,
716 RGWAccessControlPolicy& policy)
717 {
718 headers_gen.set_http_attrs(http_attrs);
719 headers_gen.set_policy(policy);
720
721 send_ready(dpp, key);
722 }
723
724 void RGWRESTStreamS3PutObj::send_ready(const DoutPrefixProvider *dpp, RGWAccessKey& key)
725 {
726 headers_gen.sign(dpp, key, nullptr);
727
728 for (const auto& kv: new_env.get_map()) {
729 headers.emplace_back(kv);
730 }
731
732 out_cb = new RGWRESTStreamOutCB(this);
733 }
734
735 void RGWRESTStreamS3PutObj::put_obj_init(const DoutPrefixProvider *dpp, RGWAccessKey& key, rgw::sal::Object* obj, map<string, bufferlist>& attrs)
736 {
737 send_init(obj);
738 send_ready(dpp, key, attrs);
739 }
740
741 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
742 {
743 map<string, string>::iterator iter = out_headers.find(header_name);
744 if (iter != out_headers.end()) {
745 str = iter->second;
746 } else {
747 str.clear();
748 }
749 }
750
751 static int parse_rgwx_mtime(const DoutPrefixProvider *dpp, CephContext *cct, const string& s, ceph::real_time *rt)
752 {
753 string err;
754 vector<string> vec;
755
756 get_str_vec(s, ".", vec);
757
758 if (vec.empty()) {
759 return -EINVAL;
760 }
761
762 long secs = strict_strtol(vec[0].c_str(), 10, &err);
763 long nsecs = 0;
764 if (!err.empty()) {
765 ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
766 return -EINVAL;
767 }
768
769 if (vec.size() > 1) {
770 nsecs = strict_strtol(vec[1].c_str(), 10, &err);
771 if (!err.empty()) {
772 ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
773 return -EINVAL;
774 }
775 }
776
777 *rt = utime_t(secs, nsecs).to_real_time();
778
779 return 0;
780 }
781
782 static void send_prepare_convert(const rgw_obj& obj, string *resource)
783 {
784 string urlsafe_bucket, urlsafe_object;
785 url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
786 url_encode(obj.key.name, urlsafe_object);
787 *resource = urlsafe_bucket + "/" + urlsafe_object;
788 }
789
790 int RGWRESTStreamRWRequest::send_request(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
791 {
792 string resource;
793 send_prepare_convert(obj, &resource);
794
795 return send_request(dpp, &key, extra_headers, resource, mgr);
796 }
797
798 int RGWRESTStreamRWRequest::send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
799 {
800 string resource;
801 send_prepare_convert(obj, &resource);
802
803 return do_send_prepare(dpp, &key, extra_headers, resource);
804 }
805
806 int RGWRESTStreamRWRequest::send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
807 bufferlist *send_data)
808 {
809 string new_resource;
810 //do not encode slash
811 url_encode(resource, new_resource, false);
812
813 return do_send_prepare(dpp, key, extra_headers, new_resource, send_data);
814 }
815
816 int RGWRESTStreamRWRequest::do_send_prepare(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
817 bufferlist *send_data)
818 {
819 string new_url = url;
820 if (!new_url.empty() && new_url.back() != '/')
821 new_url.append("/");
822
823 string new_resource;
824 string bucket_name;
825 string old_resource = resource;
826
827 if (resource[0] == '/') {
828 new_resource = resource.substr(1);
829 } else {
830 new_resource = resource;
831 }
832
833 size_t pos = new_resource.find("/");
834 bucket_name = new_resource.substr(0, pos);
835
836 //when dest is a bucket with out other params, uri should end up with '/'
837 if(pos == string::npos && params.size() == 0 && host_style == VirtualStyle) {
838 new_resource.append("/");
839 }
840
841 if (host_style == VirtualStyle) {
842 new_url = protocol + "://" + bucket_name + "." + host;
843 if(pos == string::npos) {
844 new_resource = "";
845 } else {
846 new_resource = new_resource.substr(pos+1);
847 }
848 }
849
850 headers_gen.emplace(cct, &new_env, &new_info);
851
852 headers_gen->init(method, host, resource_prefix, new_url, new_resource, params, api_name);
853
854 headers_gen->set_http_attrs(extra_headers);
855
856 if (key) {
857 sign_key = *key;
858 }
859
860 if (send_data) {
861 set_send_length(send_data->length());
862 set_outbl(*send_data);
863 set_send_data_hint(true);
864 }
865
866 method = new_info.method;
867 url = headers_gen->get_url();
868
869 return 0;
870 }
871
872 int RGWRESTStreamRWRequest::send_request(const DoutPrefixProvider *dpp, RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
873 RGWHTTPManager *mgr, bufferlist *send_data)
874 {
875 int ret = send_prepare(dpp, key, extra_headers, resource, send_data);
876 if (ret < 0) {
877 return ret;
878 }
879
880 return send(mgr);
881 }
882
883
884 int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
885 {
886 if (!headers_gen) {
887 ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): send_prepare() was not called: likey a bug!" << dendl;
888 return -EINVAL;
889 }
890
891 const bufferlist *outblp{nullptr};
892
893 if (send_len == outbl.length()) {
894 outblp = &outbl;
895 }
896
897 if (sign_key) {
898 int r = headers_gen->sign(this, *sign_key, outblp);
899 if (r < 0) {
900 ldpp_dout(this, 0) << "ERROR: failed to sign request" << dendl;
901 return r;
902 }
903 }
904
905 for (const auto& kv: new_env.get_map()) {
906 headers.emplace_back(kv);
907 }
908
909 if (!mgr) {
910 return RGWHTTP::send(this);
911 }
912
913 int r = mgr->add_request(this);
914 if (r < 0)
915 return r;
916
917 return 0;
918 }
919
920 int RGWHTTPStreamRWRequest::complete_request(optional_yield y,
921 string *etag,
922 real_time *mtime,
923 uint64_t *psize,
924 map<string, string> *pattrs,
925 map<string, string> *pheaders)
926 {
927 int ret = wait(y);
928 if (ret < 0) {
929 return ret;
930 }
931
932 unique_lock guard(out_headers_lock);
933
934 if (etag) {
935 set_str_from_headers(out_headers, "ETAG", *etag);
936 }
937 if (status >= 0) {
938 if (mtime) {
939 string mtime_str;
940 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
941 if (!mtime_str.empty()) {
942 int ret = parse_rgwx_mtime(this, cct, mtime_str, mtime);
943 if (ret < 0) {
944 return ret;
945 }
946 } else {
947 *mtime = real_time();
948 }
949 }
950 if (psize) {
951 string size_str;
952 set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
953 string err;
954 *psize = strict_strtoll(size_str.c_str(), 10, &err);
955 if (!err.empty()) {
956 ldpp_dout(this, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
957 return -EIO;
958 }
959 }
960 }
961
962 for (auto iter = out_headers.begin(); pattrs && iter != out_headers.end(); ++iter) {
963 const string& attr_name = iter->first;
964 if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
965 string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
966 const char *src = name.c_str();
967 char buf[name.size() + 1];
968 char *dest = buf;
969 for (; *src; ++src, ++dest) {
970 switch(*src) {
971 case '_':
972 *dest = '-';
973 break;
974 default:
975 *dest = tolower(*src);
976 }
977 }
978 *dest = '\0';
979 (*pattrs)[buf] = iter->second;
980 }
981 }
982
983 if (pheaders) {
984 *pheaders = std::move(out_headers);
985 }
986 return status;
987 }
988
989 int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val)
990 {
991 if (name == "RGWX_EMBEDDED_METADATA_LEN") {
992 string err;
993 long len = strict_strtol(val.c_str(), 10, &err);
994 if (!err.empty()) {
995 ldpp_dout(this, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
996 return -EINVAL;
997 }
998
999 cb->set_extra_data_len(len);
1000 }
1001 return 0;
1002 }
1003
1004 int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
1005 {
1006 size_t orig_len = len;
1007
1008 if (cb) {
1009 in_data.append((const char *)ptr, len);
1010
1011 size_t orig_in_data_len = in_data.length();
1012
1013 int ret = cb->handle_data(in_data, pause);
1014 if (ret < 0)
1015 return ret;
1016 if (ret == 0) {
1017 in_data.clear();
1018 } else {
1019 /* partial read */
1020 ceph_assert(in_data.length() <= orig_in_data_len);
1021 len = ret;
1022 bufferlist bl;
1023 size_t left_to_read = orig_in_data_len - len;
1024 if (in_data.length() > left_to_read) {
1025 in_data.splice(0, in_data.length() - left_to_read, &bl);
1026 }
1027 }
1028 }
1029 ofs += len;
1030 return orig_len;
1031 }
1032
1033 void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
1034 std::lock_guard wl{write_lock};
1035 stream_writes = s;
1036 }
1037
1038 void RGWHTTPStreamRWRequest::unpause_receive()
1039 {
1040 std::lock_guard req_locker{get_req_lock()};
1041 if (!read_paused) {
1042 _set_read_paused(false);
1043 }
1044 }
1045
1046 void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
1047 {
1048 std::scoped_lock locker{get_req_lock(), write_lock};
1049 outbl.claim_append(bl);
1050 _set_write_paused(false);
1051 }
1052
1053 uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
1054 {
1055 std::lock_guard wl{write_lock};
1056 return outbl.length();
1057 }
1058
1059 void RGWHTTPStreamRWRequest::finish_write()
1060 {
1061 std::scoped_lock locker{get_req_lock(), write_lock};
1062 write_stream_complete = true;
1063 _set_write_paused(false);
1064 }
1065
1066 int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
1067 {
1068 uint64_t out_len;
1069 uint64_t send_size;
1070 {
1071 std::lock_guard wl{write_lock};
1072
1073 if (outbl.length() == 0) {
1074 if ((stream_writes && !write_stream_complete) ||
1075 (write_ofs < send_len)) {
1076 *pause = true;
1077 }
1078 return 0;
1079 }
1080
1081 len = std::min(len, (size_t)outbl.length());
1082
1083 bufferlist bl;
1084 outbl.splice(0, len, &bl);
1085 send_size = bl.length();
1086 if (send_size > 0) {
1087 memcpy(ptr, bl.c_str(), send_size);
1088 write_ofs += send_size;
1089 }
1090
1091 out_len = outbl.length();
1092 }
1093 /* don't need to be under write_lock here, avoid deadlocks in case notify callback
1094 * needs to lock */
1095 if (write_drain_cb) {
1096 write_drain_cb->notify(out_len);
1097 }
1098 return send_size;
1099 }