]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_client.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_common.h"
5 #include "rgw_rest_client.h"
6 #include "rgw_auth_s3.h"
7 #include "rgw_http_errors.h"
8 #include "rgw_rados.h"
9
10 #include "common/armor.h"
11 #include "common/strtol.h"
12 #include "include/str_list.h"
13 #include "rgw_crypt_sanitize.h"
14
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rgw
17
18 int RGWHTTPSimpleRequest::get_status()
19 {
20 int retcode = get_req_retcode();
21 if (retcode < 0) {
22 return retcode;
23 }
24 return status;
25 }
26
27 int RGWHTTPSimpleRequest::handle_header(const string& name, const string& val)
28 {
29 if (name == "CONTENT_LENGTH") {
30 string err;
31 long len = strict_strtol(val.c_str(), 10, &err);
32 if (!err.empty()) {
33 ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
34 return -EINVAL;
35 }
36
37 max_response = len;
38 }
39
40 return 0;
41 }
42
43 int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len)
44 {
45 unique_lock guard(out_headers_lock);
46
47 char line[len + 1];
48
49 char *s = (char *)ptr, *end = (char *)ptr + len;
50 char *p = line;
51 ldout(cct, 10) << "receive_http_header" << dendl;
52
53 while (s != end) {
54 if (*s == '\r') {
55 s++;
56 continue;
57 }
58 if (*s == '\n') {
59 *p = '\0';
60 ldout(cct, 10) << "received header:" << line << dendl;
61 // TODO: fill whatever data required here
62 char *l = line;
63 char *tok = strsep(&l, " \t:");
64 if (tok && l) {
65 while (*l == ' ')
66 l++;
67
68 if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
69 http_status = atoi(l);
70 if (http_status == 100) /* 100-continue response */
71 continue;
72 status = rgw_http_error_to_errno(http_status);
73 } else {
74 /* convert header field name to upper case */
75 char *src = tok;
76 char buf[len + 1];
77 size_t i;
78 for (i = 0; i < len && *src; ++i, ++src) {
79 switch (*src) {
80 case '-':
81 buf[i] = '_';
82 break;
83 default:
84 buf[i] = toupper(*src);
85 }
86 }
87 buf[i] = '\0';
88 out_headers[buf] = l;
89 int r = handle_header(buf, l);
90 if (r < 0)
91 return r;
92 }
93 }
94 }
95 if (s != end)
96 *p++ = *s++;
97 }
98 return 0;
99 }
100
101 static void get_new_date_str(string& date_str)
102 {
103 date_str = rgw_to_asctime(ceph_clock_now());
104 }
105
106 static void get_gmt_date_str(string& date_str)
107 {
108 auto now_time = ceph::real_clock::now();
109 time_t rawtime = ceph::real_clock::to_time_t(now_time);
110
111 char buffer[80];
112
113 struct tm timeInfo;
114 gmtime_r(&rawtime, &timeInfo);
115 strftime(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S %z", &timeInfo);
116
117 date_str = buffer;
118 }
119
120 int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *_method, const char *resource)
121 {
122 method = _method;
123 string new_url = url;
124 string new_resource = resource;
125
126 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
127 new_url = new_url.substr(0, new_url.size() - 1);
128 } else if (resource[0] != '/') {
129 new_resource = "/";
130 new_resource.append(resource);
131 }
132 new_url.append(new_resource);
133 url = new_url;
134
135 string date_str;
136 get_new_date_str(date_str);
137 headers.push_back(pair<string, string>("HTTP_DATE", date_str));
138
139 string canonical_header;
140 meta_map_t meta_map;
141 map<string, string> sub_resources;
142
143 rgw_create_s3_canonical_header(method.c_str(), NULL, NULL, date_str.c_str(),
144 meta_map, meta_map, url.c_str(), sub_resources,
145 canonical_header);
146
147 string digest;
148 try {
149 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
150 } catch (int ret) {
151 return ret;
152 }
153
154 string auth_hdr = "AWS " + key.id + ":" + digest;
155
156 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
157
158 headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
159 int r = process(null_yield);
160 if (r < 0)
161 return r;
162
163 return status;
164 }
165
166 int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len, bool* pause)
167 {
168 if (!send_iter)
169 return 0;
170
171 if (len > send_iter->get_remaining())
172 len = send_iter->get_remaining();
173
174 send_iter->copy(len, (char *)ptr);
175
176 return len;
177 }
178
179 int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
180 {
181 size_t cp_len, left_len;
182
183 left_len = max_response > response.length() ? (max_response - response.length()) : 0;
184 if (left_len == 0)
185 return 0; /* don't read extra data */
186
187 cp_len = (len > left_len) ? left_len : len;
188 bufferptr p((char *)ptr, cp_len);
189
190 response.append(p);
191
192 return 0;
193 }
194
195 static void append_param(string& dest, const string& name, const string& val)
196 {
197 if (dest.empty()) {
198 dest.append("?");
199 } else {
200 dest.append("&");
201 }
202 string url_name;
203 url_encode(name, url_name);
204 dest.append(url_name);
205
206 if (!val.empty()) {
207 string url_val;
208 url_encode(val, url_val);
209 dest.append("=");
210 dest.append(url_val);
211 }
212 }
213
214 static void do_get_params_str(const param_vec_t& params, map<string, string>& extra_args, string& dest)
215 {
216 map<string, string>::iterator miter;
217 for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
218 append_param(dest, miter->first, miter->second);
219 }
220 for (auto iter = params.begin(); iter != params.end(); ++iter) {
221 append_param(dest, iter->first, iter->second);
222 }
223 }
224
225 void RGWHTTPSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
226 {
227 do_get_params_str(params, extra_args, dest);
228 }
229
230 void RGWHTTPSimpleRequest::get_out_headers(map<string, string> *pheaders)
231 {
232 unique_lock guard(out_headers_lock);
233 pheaders->swap(out_headers);
234 out_headers.clear();
235 }
236
237 static int sign_request(CephContext *cct, RGWAccessKey& key, RGWEnv& env, req_info& info)
238 {
239 /* don't sign if no key is provided */
240 if (key.key.empty()) {
241 return 0;
242 }
243
244 if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
245 for (const auto& i: env.get_map()) {
246 ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
247 }
248 }
249
250 string canonical_header;
251 if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
252 ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
253 return -EINVAL;
254 }
255
256 ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
257
258 string digest;
259 try {
260 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
261 } catch (int ret) {
262 return ret;
263 }
264
265 string auth_hdr = "AWS " + key.id + ":" + digest;
266 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
267
268 env.set("AUTHORIZATION", auth_hdr);
269
270 return 0;
271 }
272
273 int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
274 {
275
276 string date_str;
277 get_new_date_str(date_str);
278
279 RGWEnv new_env;
280 req_info new_info(cct, &new_env);
281 new_info.rebuild_from(info);
282
283 new_env.set("HTTP_DATE", date_str.c_str());
284
285 int ret = sign_request(cct, key, new_env, new_info);
286 if (ret < 0) {
287 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
288 return ret;
289 }
290
291 for (const auto& kv: new_env.get_map()) {
292 headers.emplace_back(kv);
293 }
294
295 meta_map_t& meta_map = new_info.x_meta_map;
296 for (const auto& kv: meta_map) {
297 headers.emplace_back(kv);
298 }
299
300 string params_str;
301 get_params_str(info.args.get_params(), params_str);
302
303 string new_url = url;
304 string& resource = new_info.request_uri;
305 string new_resource = resource;
306 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
307 new_url = new_url.substr(0, new_url.size() - 1);
308 } else if (resource[0] != '/') {
309 new_resource = "/";
310 new_resource.append(resource);
311 }
312 new_url.append(new_resource + params_str);
313
314 bufferlist::iterator bliter;
315
316 if (inbl) {
317 bliter = inbl->begin();
318 send_iter = &bliter;
319
320 set_send_length(inbl->length());
321 }
322
323 method = new_info.method;
324 url = new_url;
325
326 int r = process(null_yield);
327 if (r < 0){
328 if (r == -EINVAL){
329 // curl_easy has errored, generally means the service is not available
330 r = -ERR_SERVICE_UNAVAILABLE;
331 }
332 return r;
333 }
334
335 response.append((char)0); /* NULL terminate response */
336
337 if (outbl) {
338 outbl->claim(response);
339 }
340
341 return status;
342 }
343
344 class RGWRESTStreamOutCB : public RGWGetDataCB {
345 RGWRESTStreamS3PutObj *req;
346 public:
347 explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
348 int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
349 };
350
351 int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
352 {
353 dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
354 if (!bl_ofs && bl_len == bl.length()) {
355 req->add_send_data(bl);
356 return 0;
357 }
358
359 bufferptr bp(bl.c_str() + bl_ofs, bl_len);
360 bufferlist new_bl;
361 new_bl.push_back(bp);
362
363 req->add_send_data(new_bl);
364 return 0;
365 }
366
367 RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
368 {
369 delete out_cb;
370 }
371
372 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
373 {
374 string& s = grants_by_type[perm];
375
376 if (!s.empty())
377 s.append(", ");
378
379 string id_type_str;
380 ACLGranteeType& type = grant.get_type();
381 switch (type.get_type()) {
382 case ACL_TYPE_GROUP:
383 id_type_str = "uri";
384 break;
385 case ACL_TYPE_EMAIL_USER:
386 id_type_str = "emailAddress";
387 break;
388 default:
389 id_type_str = "id";
390 }
391 rgw_user id;
392 grant.get_id(id);
393 s.append(id_type_str + "=\"" + id.to_str() + "\"");
394 }
395
396 struct grant_type_to_header {
397 int type;
398 const char *header;
399 };
400
401 struct grant_type_to_header grants_headers_def[] = {
402 { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
403 { RGW_PERM_READ, "x-amz-grant-read"},
404 { RGW_PERM_WRITE, "x-amz-grant-write"},
405 { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"},
406 { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"},
407 { 0, NULL}
408 };
409
410 static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
411 {
412 if ((perm & check_perm) == check_perm) {
413 grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
414 return true;
415 }
416 return false;
417 }
418
419 static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
420 {
421 struct grant_type_to_header *t;
422
423 for (t = grants_headers_def; t->header; t++) {
424 if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
425 return;
426 }
427 }
428
429 static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
430 {
431 struct grant_type_to_header *t;
432
433 for (t = grants_headers_def; t->header; t++) {
434 map<int, string>::iterator iter = grants.find(t->type);
435 if (iter != grants.end()) {
436 env.set(t->header,iter->second);
437 meta_map[t->header] = iter->second;
438 }
439 }
440 }
441
442 void RGWRESTGenerateHTTPHeaders::init(const string& _method, const string& _url, const string& resource, const param_vec_t& params)
443 {
444 string params_str;
445 map<string, string>& args = new_info->args.get_params();
446 do_get_params_str(params, args, params_str);
447
448 /* merge params with extra args so that we can sign correctly */
449 for (auto iter = params.begin(); iter != params.end(); ++iter) {
450 new_info->args.append(iter->first, iter->second);
451 }
452
453 url = _url + resource + params_str;
454
455 string date_str;
456 get_gmt_date_str(date_str);
457
458 new_env->set("HTTP_DATE", date_str.c_str());
459
460 method = _method;
461 new_info->method = method.c_str();
462
463 new_info->script_uri = "/";
464 new_info->script_uri.append(resource);
465 new_info->request_uri = new_info->script_uri;
466 }
467
468 static bool is_x_amz(const string& s) {
469 return boost::algorithm::starts_with(s, "x-amz-");
470 }
471
472 void RGWRESTGenerateHTTPHeaders::set_extra_headers(const map<string, string>& extra_headers)
473 {
474 for (auto iter : extra_headers) {
475 const string& name = lowercase_dash_http_attr(iter.first);
476 new_env->set(name, iter.second.c_str());
477 if (is_x_amz(name)) {
478 new_info->x_meta_map[name] = iter.second;
479 }
480 }
481 }
482
483 int RGWRESTGenerateHTTPHeaders::set_obj_attrs(map<string, bufferlist>& rgw_attrs)
484 {
485 map<string, string> new_attrs;
486
487 /* merge send headers */
488 for (auto& attr: rgw_attrs) {
489 bufferlist& bl = attr.second;
490 const string& name = attr.first;
491 string val = bl.c_str();
492 if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
493 string header_name = RGW_AMZ_META_PREFIX;
494 header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
495 new_attrs[header_name] = val;
496 }
497 }
498
499 RGWAccessControlPolicy policy;
500 int ret = rgw_policy_from_attrset(cct, rgw_attrs, &policy);
501 if (ret < 0) {
502 ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
503 return ret;
504 }
505
506 set_http_attrs(new_attrs);
507 set_policy(policy);
508
509 return 0;
510 }
511
512 static std::set<string> keep_headers = { "content-type",
513 "content-encoding",
514 "content-disposition",
515 "content-language" };
516
517 void RGWRESTGenerateHTTPHeaders::set_http_attrs(const map<string, string>& http_attrs)
518 {
519 /* merge send headers */
520 for (auto& attr: http_attrs) {
521 const string& val = attr.second;
522 const string& name = lowercase_dash_http_attr(attr.first);
523 if (is_x_amz(name)) {
524 new_env->set(name, val);
525 new_info->x_meta_map[name] = val;
526 } else {
527 new_env->set(attr.first, val); /* Ugh, using the uppercase representation,
528 as the signing function calls info.env.get("CONTENT_TYPE").
529 This needs to be cleaned up! */
530 }
531 }
532 }
533
534 void RGWRESTGenerateHTTPHeaders::set_policy(RGWAccessControlPolicy& policy)
535 {
536 /* update acl headers */
537 RGWAccessControlList& acl = policy.get_acl();
538 multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
539 multimap<string, ACLGrant>::iterator giter;
540 map<int, string> grants_by_type;
541 for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
542 ACLGrant& grant = giter->second;
543 ACLPermission& perm = grant.get_permission();
544 grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
545 }
546 add_grants_headers(grants_by_type, *new_env, new_info->x_meta_map);
547 }
548
549 int RGWRESTGenerateHTTPHeaders::sign(RGWAccessKey& key)
550 {
551 int ret = sign_request(cct, key, *new_env, *new_info);
552 if (ret < 0) {
553 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
554 return ret;
555 }
556
557 return 0;
558 }
559
560 void RGWRESTStreamS3PutObj::send_init(rgw_obj& obj)
561 {
562 string resource_str;
563 string resource;
564 string new_url = url;
565
566 if (host_style == VirtualStyle) {
567 resource_str = obj.get_oid();
568 new_url = obj.bucket.name + "." + new_url;
569 } else {
570 resource_str = obj.bucket.name + "/" + obj.get_oid();
571 }
572
573 //do not encode slash in object key name
574 url_encode(resource_str, resource, false);
575
576 if (new_url[new_url.size() - 1] != '/')
577 new_url.append("/");
578
579 method = "PUT";
580 headers_gen.init(method, new_url, resource, params);
581
582 url = headers_gen.get_url();
583 }
584
585 int RGWRESTStreamS3PutObj::send_ready(RGWAccessKey& key, map<string, bufferlist>& rgw_attrs, bool send)
586 {
587 headers_gen.set_obj_attrs(rgw_attrs);
588
589 return send_ready(key, send);
590 }
591
592 int RGWRESTStreamS3PutObj::send_ready(RGWAccessKey& key, const map<string, string>& http_attrs,
593 RGWAccessControlPolicy& policy, bool send)
594 {
595 headers_gen.set_http_attrs(http_attrs);
596 headers_gen.set_policy(policy);
597
598 return send_ready(key, send);
599 }
600
601 int RGWRESTStreamS3PutObj::send_ready(RGWAccessKey& key, bool send)
602 {
603 headers_gen.sign(key);
604
605 for (const auto& kv: new_env.get_map()) {
606 headers.emplace_back(kv);
607 }
608
609 out_cb = new RGWRESTStreamOutCB(this);
610
611 if (send) {
612 int r = RGWHTTP::send(this);
613 if (r < 0)
614 return r;
615 }
616
617 return 0;
618 }
619
620 int RGWRESTStreamS3PutObj::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs, bool send)
621 {
622 send_init(obj);
623 return send_ready(key, attrs, send);
624 }
625
626 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
627 {
628 map<string, string>::iterator iter = out_headers.find(header_name);
629 if (iter != out_headers.end()) {
630 str = iter->second;
631 } else {
632 str.clear();
633 }
634 }
635
636 static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
637 {
638 string err;
639 vector<string> vec;
640
641 get_str_vec(s, ".", vec);
642
643 if (vec.empty()) {
644 return -EINVAL;
645 }
646
647 long secs = strict_strtol(vec[0].c_str(), 10, &err);
648 long nsecs = 0;
649 if (!err.empty()) {
650 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
651 return -EINVAL;
652 }
653
654 if (vec.size() > 1) {
655 nsecs = strict_strtol(vec[1].c_str(), 10, &err);
656 if (!err.empty()) {
657 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
658 return -EINVAL;
659 }
660 }
661
662 *rt = utime_t(secs, nsecs).to_real_time();
663
664 return 0;
665 }
666
667 static void send_prepare_convert(const rgw_obj& obj, string *resource)
668 {
669 string urlsafe_bucket, urlsafe_object;
670 url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
671 url_encode(obj.key.name, urlsafe_object);
672 *resource = urlsafe_bucket + "/" + urlsafe_object;
673 }
674
675 int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
676 {
677 string resource;
678 send_prepare_convert(obj, &resource);
679
680 return send_request(&key, extra_headers, resource, mgr);
681 }
682
683 int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
684 {
685 string resource;
686 send_prepare_convert(obj, &resource);
687
688 return do_send_prepare(&key, extra_headers, resource);
689 }
690
691 int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
692 bufferlist *send_data)
693 {
694 string new_resource;
695 //do not encode slash
696 url_encode(resource, new_resource, false);
697
698 return do_send_prepare(key, extra_headers, new_resource, send_data);
699 }
700
701 int RGWRESTStreamRWRequest::do_send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
702 bufferlist *send_data)
703 {
704 string new_url = url;
705 if (new_url[new_url.size() - 1] != '/')
706 new_url.append("/");
707
708 RGWEnv new_env;
709 req_info new_info(cct, &new_env);
710
711 string new_resource;
712 string bucket_name;
713 string old_resource = resource;
714
715 if (resource[0] == '/') {
716 new_resource = resource.substr(1);
717 } else {
718 new_resource = resource;
719 }
720
721 size_t pos = new_resource.find("/");
722 bucket_name = new_resource.substr(0, pos);
723
724 //when dest is a bucket with out other params, uri should end up with '/'
725 if(pos == string::npos && params.size() == 0 && host_style == VirtualStyle) {
726 new_resource.append("/");
727 }
728
729 if (host_style == VirtualStyle) {
730 new_url = bucket_name + "." + new_url;
731 if(pos == string::npos) {
732 new_resource = "";
733 } else {
734 new_resource = new_resource.substr(pos+1);
735 }
736 }
737
738 RGWRESTGenerateHTTPHeaders headers_gen(cct, &new_env, &new_info);
739
740 headers_gen.init(method, new_url, new_resource, params);
741
742 headers_gen.set_http_attrs(extra_headers);
743
744 if (key) {
745 #if 0
746 new_info.init_meta_info(nullptr);
747 #endif
748
749 int ret = headers_gen.sign(*key);
750 if (ret < 0) {
751 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
752 return ret;
753 }
754 }
755
756 for (const auto& kv: new_env.get_map()) {
757 headers.emplace_back(kv);
758 }
759
760 if (send_data) {
761 set_send_length(send_data->length());
762 set_outbl(*send_data);
763 set_send_data_hint(true);
764 }
765
766
767 method = new_info.method;
768 url = headers_gen.get_url();
769
770 return 0;
771 }
772
773 int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
774 RGWHTTPManager *mgr, bufferlist *send_data)
775 {
776 int ret = send_prepare(key, extra_headers, resource, send_data);
777 if (ret < 0) {
778 return ret;
779 }
780
781 return send(mgr);
782 }
783
784
785 int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
786 {
787 if (!mgr) {
788 return RGWHTTP::send(this);
789 }
790
791 int r = mgr->add_request(this);
792 if (r < 0)
793 return r;
794
795 return 0;
796 }
797
798 int RGWRESTStreamRWRequest::complete_request(string *etag,
799 real_time *mtime,
800 uint64_t *psize,
801 map<string, string> *pattrs,
802 map<string, string> *pheaders)
803 {
804 int ret = wait(null_yield);
805 if (ret < 0) {
806 return ret;
807 }
808
809 unique_lock guard(out_headers_lock);
810
811 if (etag) {
812 set_str_from_headers(out_headers, "ETAG", *etag);
813 }
814 if (status >= 0) {
815 if (mtime) {
816 string mtime_str;
817 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
818 if (!mtime_str.empty()) {
819 int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
820 if (ret < 0) {
821 return ret;
822 }
823 } else {
824 *mtime = real_time();
825 }
826 }
827 if (psize) {
828 string size_str;
829 set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
830 string err;
831 *psize = strict_strtoll(size_str.c_str(), 10, &err);
832 if (!err.empty()) {
833 ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
834 return -EIO;
835 }
836 }
837 }
838
839 for (auto iter = out_headers.begin(); pattrs && iter != out_headers.end(); ++iter) {
840 const string& attr_name = iter->first;
841 if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
842 string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
843 const char *src = name.c_str();
844 char buf[name.size() + 1];
845 char *dest = buf;
846 for (; *src; ++src, ++dest) {
847 switch(*src) {
848 case '_':
849 *dest = '-';
850 break;
851 default:
852 *dest = tolower(*src);
853 }
854 }
855 *dest = '\0';
856 (*pattrs)[buf] = iter->second;
857 }
858 }
859
860 if (pheaders) {
861 *pheaders = std::move(out_headers);
862 }
863 return status;
864 }
865
866 int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val)
867 {
868 if (name == "RGWX_EMBEDDED_METADATA_LEN") {
869 string err;
870 long len = strict_strtol(val.c_str(), 10, &err);
871 if (!err.empty()) {
872 ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
873 return -EINVAL;
874 }
875
876 cb->set_extra_data_len(len);
877 }
878 return 0;
879 }
880
881 int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
882 {
883 size_t orig_len = len;
884
885 if (cb) {
886 in_data.append((const char *)ptr, len);
887
888 size_t orig_in_data_len = in_data.length();
889
890 int ret = cb->handle_data(in_data, pause);
891 if (ret < 0)
892 return ret;
893 if (ret == 0) {
894 in_data.clear();
895 } else {
896 /* partial read */
897 ceph_assert(in_data.length() <= orig_in_data_len);
898 len = ret;
899 bufferlist bl;
900 size_t left_to_read = orig_in_data_len - len;
901 if (in_data.length() > left_to_read) {
902 in_data.splice(0, in_data.length() - left_to_read, &bl);
903 }
904 }
905 }
906 ofs += len;
907 return orig_len;
908 }
909
910 void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
911 std::lock_guard wl{write_lock};
912 stream_writes = s;
913 }
914
915 void RGWHTTPStreamRWRequest::unpause_receive()
916 {
917 std::lock_guard req_locker{get_req_lock()};
918 if (!read_paused) {
919 _set_read_paused(false);
920 }
921 }
922
923 void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
924 {
925 std::scoped_lock locker{get_req_lock(), write_lock};
926 outbl.claim_append(bl);
927 _set_write_paused(false);
928 }
929
930 uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
931 {
932 std::lock_guard wl{write_lock};
933 return outbl.length();
934 }
935
936 void RGWHTTPStreamRWRequest::finish_write()
937 {
938 std::scoped_lock locker{get_req_lock(), write_lock};
939 write_stream_complete = true;
940 _set_write_paused(false);
941 }
942
943 int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
944 {
945 uint64_t out_len;
946 uint64_t send_size;
947 {
948 std::lock_guard wl{write_lock};
949
950 if (outbl.length() == 0) {
951 if ((stream_writes && !write_stream_complete) ||
952 (write_ofs < send_len)) {
953 *pause = true;
954 }
955 return 0;
956 }
957
958 len = std::min(len, (size_t)outbl.length());
959
960 bufferlist bl;
961 outbl.splice(0, len, &bl);
962 send_size = bl.length();
963 if (send_size > 0) {
964 memcpy(ptr, bl.c_str(), send_size);
965 write_ofs += send_size;
966 }
967
968 out_len = outbl.length();
969 }
970 /* don't need to be under write_lock here, avoid deadlocks in case notify callback
971 * needs to lock */
972 if (write_drain_cb) {
973 write_drain_cb->notify(out_len);
974 }
975 return send_size;
976 }
977
978 class StreamIntoBufferlist : public RGWGetDataCB {
979 bufferlist& bl;
980 public:
981 explicit StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
982 int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
983 bl.claim_append(inbl);
984 return bl_len;
985 }
986 };
987