]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_client.cc
b0d6e31ca40ad82a4a356c79483a862aed985296
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_common.h"
5 #include "rgw_rest_client.h"
6 #include "rgw_auth_s3.h"
7 #include "rgw_http_errors.h"
8
9 #include "common/armor.h"
10 #include "common/strtol.h"
11 #include "include/str_list.h"
12 #include "rgw_crypt_sanitize.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rgw
16
17 int RGWHTTPSimpleRequest::get_status()
18 {
19 int retcode = get_req_retcode();
20 if (retcode < 0) {
21 return retcode;
22 }
23 return status;
24 }
25
26 int RGWHTTPSimpleRequest::handle_header(const string& name, const string& val)
27 {
28 if (name == "CONTENT_LENGTH") {
29 string err;
30 long len = strict_strtol(val.c_str(), 10, &err);
31 if (!err.empty()) {
32 ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
33 return -EINVAL;
34 }
35
36 max_response = len;
37 }
38
39 return 0;
40 }
41
42 int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len)
43 {
44 unique_lock guard(out_headers_lock);
45
46 char line[len + 1];
47
48 char *s = (char *)ptr, *end = (char *)ptr + len;
49 char *p = line;
50 ldout(cct, 10) << "receive_http_header" << dendl;
51
52 while (s != end) {
53 if (*s == '\r') {
54 s++;
55 continue;
56 }
57 if (*s == '\n') {
58 *p = '\0';
59 ldout(cct, 10) << "received header:" << line << dendl;
60 // TODO: fill whatever data required here
61 char *l = line;
62 char *tok = strsep(&l, " \t:");
63 if (tok && l) {
64 while (*l == ' ')
65 l++;
66
67 if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
68 http_status = atoi(l);
69 if (http_status == 100) /* 100-continue response */
70 continue;
71 status = rgw_http_error_to_errno(http_status);
72 } else {
73 /* convert header field name to upper case */
74 char *src = tok;
75 char buf[len + 1];
76 size_t i;
77 for (i = 0; i < len && *src; ++i, ++src) {
78 switch (*src) {
79 case '-':
80 buf[i] = '_';
81 break;
82 default:
83 buf[i] = toupper(*src);
84 }
85 }
86 buf[i] = '\0';
87 out_headers[buf] = l;
88 int r = handle_header(buf, l);
89 if (r < 0)
90 return r;
91 }
92 }
93 }
94 if (s != end)
95 *p++ = *s++;
96 }
97 return 0;
98 }
99
100 static void get_new_date_str(string& date_str)
101 {
102 date_str = rgw_to_asctime(ceph_clock_now());
103 }
104
105 static void get_gmt_date_str(string& date_str)
106 {
107 auto now_time = ceph::real_clock::now();
108 time_t rawtime = ceph::real_clock::to_time_t(now_time);
109
110 char buffer[80];
111
112 struct tm timeInfo;
113 gmtime_r(&rawtime, &timeInfo);
114 strftime(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S %z", &timeInfo);
115
116 date_str = buffer;
117 }
118
119 int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *_method, const char *resource, optional_yield y)
120 {
121 method = _method;
122 string new_url = url;
123 string new_resource = resource;
124
125 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
126 new_url = new_url.substr(0, new_url.size() - 1);
127 } else if (resource[0] != '/') {
128 new_resource = "/";
129 new_resource.append(resource);
130 }
131 new_url.append(new_resource);
132 url = new_url;
133
134 string date_str;
135 get_new_date_str(date_str);
136 headers.push_back(pair<string, string>("HTTP_DATE", date_str));
137
138 string canonical_header;
139 meta_map_t meta_map;
140 map<string, string> sub_resources;
141
142 rgw_create_s3_canonical_header(method.c_str(), NULL, NULL, date_str.c_str(),
143 meta_map, meta_map, url.c_str(), sub_resources,
144 canonical_header);
145
146 string digest;
147 try {
148 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
149 } catch (int ret) {
150 return ret;
151 }
152
153 string auth_hdr = "AWS " + key.id + ":" + digest;
154
155 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
156
157 headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
158 int r = process(y);
159 if (r < 0)
160 return r;
161
162 return status;
163 }
164
165 int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len, bool* pause)
166 {
167 if (!send_iter)
168 return 0;
169
170 if (len > send_iter->get_remaining())
171 len = send_iter->get_remaining();
172
173 send_iter->copy(len, (char *)ptr);
174
175 return len;
176 }
177
178 int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
179 {
180 size_t cp_len, left_len;
181
182 left_len = max_response > response.length() ? (max_response - response.length()) : 0;
183 if (left_len == 0)
184 return 0; /* don't read extra data */
185
186 cp_len = (len > left_len) ? left_len : len;
187 bufferptr p((char *)ptr, cp_len);
188
189 response.append(p);
190
191 return 0;
192 }
193
194 static void append_param(string& dest, const string& name, const string& val)
195 {
196 if (dest.empty()) {
197 dest.append("?");
198 } else {
199 dest.append("&");
200 }
201 string url_name;
202 url_encode(name, url_name);
203 dest.append(url_name);
204
205 if (!val.empty()) {
206 string url_val;
207 url_encode(val, url_val);
208 dest.append("=");
209 dest.append(url_val);
210 }
211 }
212
213 static void do_get_params_str(const param_vec_t& params, map<string, string>& extra_args, string& dest)
214 {
215 map<string, string>::iterator miter;
216 for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
217 append_param(dest, miter->first, miter->second);
218 }
219 for (auto iter = params.begin(); iter != params.end(); ++iter) {
220 append_param(dest, iter->first, iter->second);
221 }
222 }
223
224 void RGWHTTPSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
225 {
226 do_get_params_str(params, extra_args, dest);
227 }
228
229 void RGWHTTPSimpleRequest::get_out_headers(map<string, string> *pheaders)
230 {
231 unique_lock guard(out_headers_lock);
232 pheaders->swap(out_headers);
233 out_headers.clear();
234 }
235
236 static int sign_request(CephContext *cct, RGWAccessKey& key, RGWEnv& env, req_info& info)
237 {
238 /* don't sign if no key is provided */
239 if (key.key.empty()) {
240 return 0;
241 }
242
243 if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
244 for (const auto& i: env.get_map()) {
245 ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
246 }
247 }
248
249 string canonical_header;
250 if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
251 ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
252 return -EINVAL;
253 }
254
255 ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
256
257 string digest;
258 try {
259 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
260 } catch (int ret) {
261 return ret;
262 }
263
264 string auth_hdr = "AWS " + key.id + ":" + digest;
265 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
266
267 env.set("AUTHORIZATION", auth_hdr);
268
269 return 0;
270 }
271
272 int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y)
273 {
274
275 string date_str;
276 get_new_date_str(date_str);
277
278 RGWEnv new_env;
279 req_info new_info(cct, &new_env);
280 new_info.rebuild_from(info);
281 string bucket_encode;
282 string request_uri_encode;
283 size_t pos = new_info.request_uri.substr(1, new_info.request_uri.size() - 1).find("/");
284 string bucket = new_info.request_uri.substr(1, pos);
285 url_encode(bucket, bucket_encode);
286 if (std::string::npos != pos)
287 request_uri_encode = string("/") + bucket_encode + new_info.request_uri.substr(pos + 1);
288 else
289 request_uri_encode = string("/") + bucket_encode;
290 new_info.request_uri = request_uri_encode;
291 new_env.set("HTTP_DATE", date_str.c_str());
292 const char* const content_md5 = info.env->get("HTTP_CONTENT_MD5");
293 if (content_md5) {
294 new_env.set("HTTP_CONTENT_MD5", content_md5);
295 }
296 int ret = sign_request(cct, key, new_env, new_info);
297 if (ret < 0) {
298 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
299 return ret;
300 }
301
302 for (const auto& kv: new_env.get_map()) {
303 headers.emplace_back(kv);
304 }
305
306 meta_map_t& meta_map = new_info.x_meta_map;
307 for (const auto& kv: meta_map) {
308 headers.emplace_back(kv);
309 }
310
311 string params_str;
312 get_params_str(info.args.get_params(), params_str);
313
314 string new_url = url;
315 string& resource = new_info.request_uri;
316 string new_resource = resource;
317 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
318 new_url = new_url.substr(0, new_url.size() - 1);
319 } else if (resource[0] != '/') {
320 new_resource = "/";
321 new_resource.append(resource);
322 }
323 new_url.append(new_resource + params_str);
324
325 bufferlist::iterator bliter;
326
327 if (inbl) {
328 bliter = inbl->begin();
329 send_iter = &bliter;
330
331 set_send_length(inbl->length());
332 }
333
334 method = new_info.method;
335 url = new_url;
336
337 int r = process(y);
338 if (r < 0){
339 if (r == -EINVAL){
340 // curl_easy has errored, generally means the service is not available
341 r = -ERR_SERVICE_UNAVAILABLE;
342 }
343 return r;
344 }
345
346 response.append((char)0); /* NULL terminate response */
347
348 if (outbl) {
349 *outbl = std::move(response);
350 }
351
352 return status;
353 }
354
355 class RGWRESTStreamOutCB : public RGWGetDataCB {
356 RGWRESTStreamS3PutObj *req;
357 public:
358 explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
359 int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
360 };
361
362 int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
363 {
364 dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
365 if (!bl_ofs && bl_len == bl.length()) {
366 req->add_send_data(bl);
367 return 0;
368 }
369
370 bufferptr bp(bl.c_str() + bl_ofs, bl_len);
371 bufferlist new_bl;
372 new_bl.push_back(bp);
373
374 req->add_send_data(new_bl);
375 return 0;
376 }
377
378 RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
379 {
380 delete out_cb;
381 }
382
383 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
384 {
385 string& s = grants_by_type[perm];
386
387 if (!s.empty())
388 s.append(", ");
389
390 string id_type_str;
391 ACLGranteeType& type = grant.get_type();
392 switch (type.get_type()) {
393 case ACL_TYPE_GROUP:
394 id_type_str = "uri";
395 break;
396 case ACL_TYPE_EMAIL_USER:
397 id_type_str = "emailAddress";
398 break;
399 default:
400 id_type_str = "id";
401 }
402 rgw_user id;
403 grant.get_id(id);
404 s.append(id_type_str + "=\"" + id.to_str() + "\"");
405 }
406
407 struct grant_type_to_header {
408 int type;
409 const char *header;
410 };
411
412 struct grant_type_to_header grants_headers_def[] = {
413 { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
414 { RGW_PERM_READ, "x-amz-grant-read"},
415 { RGW_PERM_WRITE, "x-amz-grant-write"},
416 { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"},
417 { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"},
418 { 0, NULL}
419 };
420
421 static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
422 {
423 if ((perm & check_perm) == check_perm) {
424 grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
425 return true;
426 }
427 return false;
428 }
429
430 static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
431 {
432 struct grant_type_to_header *t;
433
434 for (t = grants_headers_def; t->header; t++) {
435 if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
436 return;
437 }
438 }
439
440 static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
441 {
442 struct grant_type_to_header *t;
443
444 for (t = grants_headers_def; t->header; t++) {
445 map<int, string>::iterator iter = grants.find(t->type);
446 if (iter != grants.end()) {
447 env.set(t->header,iter->second);
448 meta_map[t->header] = iter->second;
449 }
450 }
451 }
452
453 void RGWRESTGenerateHTTPHeaders::init(const string& _method, const string& _url, const string& resource, const param_vec_t& params)
454 {
455 string params_str;
456 map<string, string>& args = new_info->args.get_params();
457 do_get_params_str(params, args, params_str);
458
459 /* merge params with extra args so that we can sign correctly */
460 for (auto iter = params.begin(); iter != params.end(); ++iter) {
461 new_info->args.append(iter->first, iter->second);
462 }
463
464 url = _url + resource + params_str;
465
466 string date_str;
467 get_gmt_date_str(date_str);
468
469 new_env->set("HTTP_DATE", date_str.c_str());
470
471 method = _method;
472 new_info->method = method.c_str();
473
474 new_info->script_uri = "/";
475 new_info->script_uri.append(resource);
476 new_info->request_uri = new_info->script_uri;
477 }
478
479 static bool is_x_amz(const string& s) {
480 return boost::algorithm::starts_with(s, "x-amz-");
481 }
482
483 void RGWRESTGenerateHTTPHeaders::set_extra_headers(const map<string, string>& extra_headers)
484 {
485 for (auto iter : extra_headers) {
486 const string& name = lowercase_dash_http_attr(iter.first);
487 new_env->set(name, iter.second.c_str());
488 if (is_x_amz(name)) {
489 new_info->x_meta_map[name] = iter.second;
490 }
491 }
492 }
493
494 int RGWRESTGenerateHTTPHeaders::set_obj_attrs(map<string, bufferlist>& rgw_attrs)
495 {
496 map<string, string> new_attrs;
497
498 /* merge send headers */
499 for (auto& attr: rgw_attrs) {
500 bufferlist& bl = attr.second;
501 const string& name = attr.first;
502 string val = bl.c_str();
503 if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
504 string header_name = RGW_AMZ_META_PREFIX;
505 header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
506 new_attrs[header_name] = val;
507 }
508 }
509
510 RGWAccessControlPolicy policy;
511 int ret = rgw_policy_from_attrset(cct, rgw_attrs, &policy);
512 if (ret < 0) {
513 ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
514 return ret;
515 }
516
517 set_http_attrs(new_attrs);
518 set_policy(policy);
519
520 return 0;
521 }
522
523 static std::set<string> keep_headers = { "content-type",
524 "content-encoding",
525 "content-disposition",
526 "content-language" };
527
528 void RGWRESTGenerateHTTPHeaders::set_http_attrs(const map<string, string>& http_attrs)
529 {
530 /* merge send headers */
531 for (auto& attr: http_attrs) {
532 const string& val = attr.second;
533 const string& name = lowercase_dash_http_attr(attr.first);
534 if (is_x_amz(name)) {
535 new_env->set(name, val);
536 new_info->x_meta_map[name] = val;
537 } else {
538 new_env->set(attr.first, val); /* Ugh, using the uppercase representation,
539 as the signing function calls info.env.get("CONTENT_TYPE").
540 This needs to be cleaned up! */
541 }
542 }
543 }
544
545 void RGWRESTGenerateHTTPHeaders::set_policy(RGWAccessControlPolicy& policy)
546 {
547 /* update acl headers */
548 RGWAccessControlList& acl = policy.get_acl();
549 multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
550 multimap<string, ACLGrant>::iterator giter;
551 map<int, string> grants_by_type;
552 for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
553 ACLGrant& grant = giter->second;
554 ACLPermission& perm = grant.get_permission();
555 grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
556 }
557 add_grants_headers(grants_by_type, *new_env, new_info->x_meta_map);
558 }
559
560 int RGWRESTGenerateHTTPHeaders::sign(RGWAccessKey& key)
561 {
562 int ret = sign_request(cct, key, *new_env, *new_info);
563 if (ret < 0) {
564 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
565 return ret;
566 }
567
568 return 0;
569 }
570
571 void RGWRESTStreamS3PutObj::send_init(rgw::sal::RGWObject* obj)
572 {
573 string resource_str;
574 string resource;
575 string new_url = url;
576
577 if (host_style == VirtualStyle) {
578 resource_str = obj->get_oid();
579 new_url = obj->get_bucket()->get_name() + "." + new_url;
580 } else {
581 resource_str = obj->get_bucket()->get_name() + "/" + obj->get_oid();
582 }
583
584 //do not encode slash in object key name
585 url_encode(resource_str, resource, false);
586
587 if (new_url[new_url.size() - 1] != '/')
588 new_url.append("/");
589
590 method = "PUT";
591 headers_gen.init(method, new_url, resource, params);
592
593 url = headers_gen.get_url();
594 }
595
596 int RGWRESTStreamS3PutObj::send_ready(RGWAccessKey& key, map<string, bufferlist>& rgw_attrs, bool send)
597 {
598 headers_gen.set_obj_attrs(rgw_attrs);
599
600 return send_ready(key, send);
601 }
602
603 int RGWRESTStreamS3PutObj::send_ready(RGWAccessKey& key, const map<string, string>& http_attrs,
604 RGWAccessControlPolicy& policy, bool send)
605 {
606 headers_gen.set_http_attrs(http_attrs);
607 headers_gen.set_policy(policy);
608
609 return send_ready(key, send);
610 }
611
612 int RGWRESTStreamS3PutObj::send_ready(RGWAccessKey& key, bool send)
613 {
614 headers_gen.sign(key);
615
616 for (const auto& kv: new_env.get_map()) {
617 headers.emplace_back(kv);
618 }
619
620 out_cb = new RGWRESTStreamOutCB(this);
621
622 if (send) {
623 int r = RGWHTTP::send(this);
624 if (r < 0)
625 return r;
626 }
627
628 return 0;
629 }
630
631 int RGWRESTStreamS3PutObj::put_obj_init(RGWAccessKey& key, rgw::sal::RGWObject* obj, uint64_t obj_size, map<string, bufferlist>& attrs, bool send)
632 {
633 send_init(obj);
634 return send_ready(key, attrs, send);
635 }
636
637 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
638 {
639 map<string, string>::iterator iter = out_headers.find(header_name);
640 if (iter != out_headers.end()) {
641 str = iter->second;
642 } else {
643 str.clear();
644 }
645 }
646
647 static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
648 {
649 string err;
650 vector<string> vec;
651
652 get_str_vec(s, ".", vec);
653
654 if (vec.empty()) {
655 return -EINVAL;
656 }
657
658 long secs = strict_strtol(vec[0].c_str(), 10, &err);
659 long nsecs = 0;
660 if (!err.empty()) {
661 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
662 return -EINVAL;
663 }
664
665 if (vec.size() > 1) {
666 nsecs = strict_strtol(vec[1].c_str(), 10, &err);
667 if (!err.empty()) {
668 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
669 return -EINVAL;
670 }
671 }
672
673 *rt = utime_t(secs, nsecs).to_real_time();
674
675 return 0;
676 }
677
678 static void send_prepare_convert(const rgw_obj& obj, string *resource)
679 {
680 string urlsafe_bucket, urlsafe_object;
681 url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
682 url_encode(obj.key.name, urlsafe_object);
683 *resource = urlsafe_bucket + "/" + urlsafe_object;
684 }
685
686 int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
687 {
688 string resource;
689 send_prepare_convert(obj, &resource);
690
691 return send_request(&key, extra_headers, resource, mgr);
692 }
693
694 int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
695 {
696 string resource;
697 send_prepare_convert(obj, &resource);
698
699 return do_send_prepare(&key, extra_headers, resource);
700 }
701
702 int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
703 bufferlist *send_data)
704 {
705 string new_resource;
706 //do not encode slash
707 url_encode(resource, new_resource, false);
708
709 return do_send_prepare(key, extra_headers, new_resource, send_data);
710 }
711
712 int RGWRESTStreamRWRequest::do_send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
713 bufferlist *send_data)
714 {
715 string new_url = url;
716 if (new_url[new_url.size() - 1] != '/')
717 new_url.append("/");
718
719 RGWEnv new_env;
720 req_info new_info(cct, &new_env);
721
722 string new_resource;
723 string bucket_name;
724 string old_resource = resource;
725
726 if (resource[0] == '/') {
727 new_resource = resource.substr(1);
728 } else {
729 new_resource = resource;
730 }
731
732 size_t pos = new_resource.find("/");
733 bucket_name = new_resource.substr(0, pos);
734
735 //when dest is a bucket with out other params, uri should end up with '/'
736 if(pos == string::npos && params.size() == 0 && host_style == VirtualStyle) {
737 new_resource.append("/");
738 }
739
740 if (host_style == VirtualStyle) {
741 new_url = bucket_name + "." + new_url;
742 if(pos == string::npos) {
743 new_resource = "";
744 } else {
745 new_resource = new_resource.substr(pos+1);
746 }
747 }
748
749 RGWRESTGenerateHTTPHeaders headers_gen(cct, &new_env, &new_info);
750
751 headers_gen.init(method, new_url, new_resource, params);
752
753 headers_gen.set_http_attrs(extra_headers);
754
755 if (key) {
756 #if 0
757 new_info.init_meta_info(nullptr);
758 #endif
759
760 int ret = headers_gen.sign(*key);
761 if (ret < 0) {
762 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
763 return ret;
764 }
765 }
766
767 for (const auto& kv: new_env.get_map()) {
768 headers.emplace_back(kv);
769 }
770
771 if (send_data) {
772 set_send_length(send_data->length());
773 set_outbl(*send_data);
774 set_send_data_hint(true);
775 }
776
777
778 method = new_info.method;
779 url = headers_gen.get_url();
780
781 return 0;
782 }
783
784 int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
785 RGWHTTPManager *mgr, bufferlist *send_data)
786 {
787 int ret = send_prepare(key, extra_headers, resource, send_data);
788 if (ret < 0) {
789 return ret;
790 }
791
792 return send(mgr);
793 }
794
795
796 int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
797 {
798 if (!mgr) {
799 return RGWHTTP::send(this);
800 }
801
802 int r = mgr->add_request(this);
803 if (r < 0)
804 return r;
805
806 return 0;
807 }
808
809 int RGWRESTStreamRWRequest::complete_request(optional_yield y,
810 string *etag,
811 real_time *mtime,
812 uint64_t *psize,
813 map<string, string> *pattrs,
814 map<string, string> *pheaders)
815 {
816 int ret = wait(y);
817 if (ret < 0) {
818 return ret;
819 }
820
821 unique_lock guard(out_headers_lock);
822
823 if (etag) {
824 set_str_from_headers(out_headers, "ETAG", *etag);
825 }
826 if (status >= 0) {
827 if (mtime) {
828 string mtime_str;
829 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
830 if (!mtime_str.empty()) {
831 int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
832 if (ret < 0) {
833 return ret;
834 }
835 } else {
836 *mtime = real_time();
837 }
838 }
839 if (psize) {
840 string size_str;
841 set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
842 string err;
843 *psize = strict_strtoll(size_str.c_str(), 10, &err);
844 if (!err.empty()) {
845 ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
846 return -EIO;
847 }
848 }
849 }
850
851 for (auto iter = out_headers.begin(); pattrs && iter != out_headers.end(); ++iter) {
852 const string& attr_name = iter->first;
853 if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
854 string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
855 const char *src = name.c_str();
856 char buf[name.size() + 1];
857 char *dest = buf;
858 for (; *src; ++src, ++dest) {
859 switch(*src) {
860 case '_':
861 *dest = '-';
862 break;
863 default:
864 *dest = tolower(*src);
865 }
866 }
867 *dest = '\0';
868 (*pattrs)[buf] = iter->second;
869 }
870 }
871
872 if (pheaders) {
873 *pheaders = std::move(out_headers);
874 }
875 return status;
876 }
877
878 int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val)
879 {
880 if (name == "RGWX_EMBEDDED_METADATA_LEN") {
881 string err;
882 long len = strict_strtol(val.c_str(), 10, &err);
883 if (!err.empty()) {
884 ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
885 return -EINVAL;
886 }
887
888 cb->set_extra_data_len(len);
889 }
890 return 0;
891 }
892
893 int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
894 {
895 size_t orig_len = len;
896
897 if (cb) {
898 in_data.append((const char *)ptr, len);
899
900 size_t orig_in_data_len = in_data.length();
901
902 int ret = cb->handle_data(in_data, pause);
903 if (ret < 0)
904 return ret;
905 if (ret == 0) {
906 in_data.clear();
907 } else {
908 /* partial read */
909 ceph_assert(in_data.length() <= orig_in_data_len);
910 len = ret;
911 bufferlist bl;
912 size_t left_to_read = orig_in_data_len - len;
913 if (in_data.length() > left_to_read) {
914 in_data.splice(0, in_data.length() - left_to_read, &bl);
915 }
916 }
917 }
918 ofs += len;
919 return orig_len;
920 }
921
922 void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
923 std::lock_guard wl{write_lock};
924 stream_writes = s;
925 }
926
927 void RGWHTTPStreamRWRequest::unpause_receive()
928 {
929 std::lock_guard req_locker{get_req_lock()};
930 if (!read_paused) {
931 _set_read_paused(false);
932 }
933 }
934
935 void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
936 {
937 std::scoped_lock locker{get_req_lock(), write_lock};
938 outbl.claim_append(bl);
939 _set_write_paused(false);
940 }
941
942 uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
943 {
944 std::lock_guard wl{write_lock};
945 return outbl.length();
946 }
947
948 void RGWHTTPStreamRWRequest::finish_write()
949 {
950 std::scoped_lock locker{get_req_lock(), write_lock};
951 write_stream_complete = true;
952 _set_write_paused(false);
953 }
954
955 int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
956 {
957 uint64_t out_len;
958 uint64_t send_size;
959 {
960 std::lock_guard wl{write_lock};
961
962 if (outbl.length() == 0) {
963 if ((stream_writes && !write_stream_complete) ||
964 (write_ofs < send_len)) {
965 *pause = true;
966 }
967 return 0;
968 }
969
970 len = std::min(len, (size_t)outbl.length());
971
972 bufferlist bl;
973 outbl.splice(0, len, &bl);
974 send_size = bl.length();
975 if (send_size > 0) {
976 memcpy(ptr, bl.c_str(), send_size);
977 write_ofs += send_size;
978 }
979
980 out_len = outbl.length();
981 }
982 /* don't need to be under write_lock here, avoid deadlocks in case notify callback
983 * needs to lock */
984 if (write_drain_cb) {
985 write_drain_cb->notify(out_len);
986 }
987 return send_size;
988 }
989
990 class StreamIntoBufferlist : public RGWGetDataCB {
991 bufferlist& bl;
992 public:
993 explicit StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
994 int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
995 bl.claim_append(inbl);
996 return bl_len;
997 }
998 };
999