]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_rest_client.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_common.h"
5#include "rgw_rest_client.h"
6#include "rgw_auth_s3.h"
7#include "rgw_http_errors.h"
8#include "rgw_rados.h"
9
10#include "common/ceph_crypto_cms.h"
11#include "common/armor.h"
12#include "common/strtol.h"
13#include "include/str_list.h"
14#include "rgw_crypt_sanitize.h"
15
16#define dout_context g_ceph_context
17#define dout_subsys ceph_subsys_rgw
18
19int RGWRESTSimpleRequest::get_status()
20{
21 int retcode = get_req_retcode();
22 if (retcode < 0) {
23 return retcode;
24 }
25 return status;
26}
27
28int RGWRESTSimpleRequest::handle_header(const string& name, const string& val)
29{
30 if (name == "CONTENT_LENGTH") {
31 string err;
32 long len = strict_strtol(val.c_str(), 10, &err);
33 if (!err.empty()) {
34 ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
35 return -EINVAL;
36 }
37
38 max_response = len;
39 }
40
41 return 0;
42}
43
44int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
45{
46 char line[len + 1];
47
48 char *s = (char *)ptr, *end = (char *)ptr + len;
49 char *p = line;
50 ldout(cct, 10) << "receive_http_header" << dendl;
51
52 while (s != end) {
53 if (*s == '\r') {
54 s++;
55 continue;
56 }
57 if (*s == '\n') {
58 *p = '\0';
59 ldout(cct, 10) << "received header:" << line << dendl;
60 // TODO: fill whatever data required here
61 char *l = line;
62 char *tok = strsep(&l, " \t:");
63 if (tok && l) {
64 while (*l == ' ')
65 l++;
66
67 if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
68 http_status = atoi(l);
69 if (http_status == 100) /* 100-continue response */
70 continue;
71 status = rgw_http_error_to_errno(http_status);
72 } else {
73 /* convert header field name to upper case */
74 char *src = tok;
75 char buf[len + 1];
76 size_t i;
77 for (i = 0; i < len && *src; ++i, ++src) {
78 switch (*src) {
79 case '-':
80 buf[i] = '_';
81 break;
82 default:
83 buf[i] = toupper(*src);
84 }
85 }
86 buf[i] = '\0';
87 out_headers[buf] = l;
88 int r = handle_header(buf, l);
89 if (r < 0)
90 return r;
91 }
92 }
93 }
94 if (s != end)
95 *p++ = *s++;
96 }
97 return 0;
98}
99
100static void get_new_date_str(string& date_str)
101{
102 utime_t tm = ceph_clock_now();
103 stringstream s;
104 tm.asctime(s);
105 date_str = s.str();
106}
107
108int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
109{
110 string new_url = url;
111 string new_resource = resource;
112
113 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
114 new_url = new_url.substr(0, new_url.size() - 1);
115 } else if (resource[0] != '/') {
116 new_resource = "/";
117 new_resource.append(resource);
118 }
119 new_url.append(new_resource);
120
121 string date_str;
122 get_new_date_str(date_str);
123 headers.push_back(pair<string, string>("HTTP_DATE", date_str));
124
125 string canonical_header;
126 map<string, string> meta_map;
127 map<string, string> sub_resources;
128 rgw_create_s3_canonical_header(method, NULL, NULL, date_str.c_str(),
129 meta_map, new_url.c_str(), sub_resources,
130 canonical_header);
131
132 string digest;
133 int ret = rgw_get_s3_header_digest(canonical_header, key.key, digest);
134 if (ret < 0) {
135 return ret;
136 }
137
138 string auth_hdr = "AWS " + key.id + ":" + digest;
139
140 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
141
142 headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
143 int r = process(method, new_url.c_str());
144 if (r < 0)
145 return r;
146
147 return status;
148}
149
150int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
151{
152 if (!send_iter)
153 return 0;
154
155 if (len > send_iter->get_remaining())
156 len = send_iter->get_remaining();
157
158 send_iter->copy(len, (char *)ptr);
159
160 return len;
161}
162
163int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
164{
165 size_t cp_len, left_len;
166
167 left_len = max_response > response.length() ? (max_response - response.length()) : 0;
168 if (left_len == 0)
169 return 0; /* don't read extra data */
170
171 cp_len = (len > left_len) ? left_len : len;
172 bufferptr p((char *)ptr, cp_len);
173
174 response.append(p);
175
176 return 0;
177
178}
179
180void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
181{
182 if (dest.empty()) {
183 dest.append("?");
184 } else {
185 dest.append("&");
186 }
187 string url_name;
188 url_encode(name, url_name);
189 dest.append(url_name);
190
191 if (!val.empty()) {
192 string url_val;
193 url_encode(val, url_val);
194 dest.append("=");
195 dest.append(url_val);
196 }
197}
198
199void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
200{
201 map<string, string>::iterator miter;
202 for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
203 append_param(dest, miter->first, miter->second);
204 }
205 param_vec_t::iterator iter;
206 for (iter = params.begin(); iter != params.end(); ++iter) {
207 append_param(dest, iter->first, iter->second);
208 }
209}
210
211int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
212{
213 /* don't sign if no key is provided */
214 if (key.key.empty()) {
215 return 0;
216 }
217
218 map<string, string, ltstr_nocase>& m = env.get_map();
219
220 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
221 map<string, string>::iterator i;
222 for (i = m.begin(); i != m.end(); ++i) {
223 ldout(cct, 20) << "> " << i->first << " -> " << rgw::crypt_sanitize::x_meta_map{i->first, i->second} << dendl;
224 }
225 }
226
227 string canonical_header;
228 if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
229 ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
230 return -EINVAL;
231 }
232
233 ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
234
235 string digest;
236 int ret = rgw_get_s3_header_digest(canonical_header, key.key, digest);
237 if (ret < 0) {
238 return ret;
239 }
240
241 string auth_hdr = "AWS " + key.id + ":" + digest;
242 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
243
244 m["AUTHORIZATION"] = auth_hdr;
245
246 return 0;
247}
248
249int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
250{
251
252 string date_str;
253 get_new_date_str(date_str);
254
255 RGWEnv new_env;
256 req_info new_info(cct, &new_env);
257 new_info.rebuild_from(info);
258
259 new_env.set("HTTP_DATE", date_str.c_str());
260
261 int ret = sign_request(key, new_env, new_info);
262 if (ret < 0) {
263 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
264 return ret;
265 }
266
267 map<string, string, ltstr_nocase>& m = new_env.get_map();
268 map<string, string>::iterator iter;
269 for (iter = m.begin(); iter != m.end(); ++iter) {
270 headers.push_back(pair<string, string>(iter->first, iter->second));
271 }
272
273 map<string, string>& meta_map = new_info.x_meta_map;
274 for (iter = meta_map.begin(); iter != meta_map.end(); ++iter) {
275 headers.push_back(pair<string, string>(iter->first, iter->second));
276 }
277
278 string params_str;
279 get_params_str(info.args.get_params(), params_str);
280
281 string new_url = url;
282 string& resource = new_info.request_uri;
283 string new_resource = resource;
284 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
285 new_url = new_url.substr(0, new_url.size() - 1);
286 } else if (resource[0] != '/') {
287 new_resource = "/";
288 new_resource.append(resource);
289 }
290 new_url.append(new_resource + params_str);
291
292 bufferlist::iterator bliter;
293
294 if (inbl) {
295 bliter = inbl->begin();
296 send_iter = &bliter;
297
298 set_send_length(inbl->length());
299 }
300
301 int r = process(new_info.method, new_url.c_str());
302 if (r < 0){
303 if (r == -EINVAL){
304 // curl_easy has errored, generally means the service is not available
305 r = -ERR_SERVICE_UNAVAILABLE;
306 }
307 return r;
308 }
309
310 response.append((char)0); /* NULL terminate response */
311
312 if (outbl) {
313 outbl->claim(response);
314 }
315
316 return status;
317}
318
319class RGWRESTStreamOutCB : public RGWGetDataCB {
320 RGWRESTStreamWriteRequest *req;
321public:
322 explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
323 int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
324};
325
326int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
327{
328 dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
329 if (!bl_ofs && bl_len == bl.length()) {
330 return req->add_output_data(bl);
331 }
332
333 bufferptr bp(bl.c_str() + bl_ofs, bl_len);
334 bufferlist new_bl;
335 new_bl.push_back(bp);
336
337 return req->add_output_data(new_bl);
338}
339
340RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
341{
342 delete cb;
343}
344
345int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
346{
347 lock.Lock();
348 if (status < 0) {
349 int ret = status;
350 lock.Unlock();
351 return ret;
352 }
353 pending_send.push_back(bl);
354 lock.Unlock();
355
356 bool done;
357 return http_manager.process_requests(false, &done);
358}
359
360static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
361{
362 string& s = grants_by_type[perm];
363
364 if (!s.empty())
365 s.append(", ");
366
367 string id_type_str;
368 ACLGranteeType& type = grant.get_type();
369 switch (type.get_type()) {
370 case ACL_TYPE_GROUP:
371 id_type_str = "uri";
372 break;
373 case ACL_TYPE_EMAIL_USER:
374 id_type_str = "emailAddress";
375 break;
376 default:
377 id_type_str = "id";
378 }
379 rgw_user id;
380 grant.get_id(id);
381 s.append(id_type_str + "=\"" + id.to_str() + "\"");
382}
383
384struct grant_type_to_header {
385 int type;
386 const char *header;
387};
388
389struct grant_type_to_header grants_headers_def[] = {
390 { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
391 { RGW_PERM_READ, "x-amz-grant-read"},
392 { RGW_PERM_WRITE, "x-amz-grant-write"},
393 { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"},
394 { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"},
395 { 0, NULL}
396};
397
398static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
399{
400 if ((perm & check_perm) == perm) {
401 grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
402 return true;
403 }
404 return false;
405}
406
407static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
408{
409 struct grant_type_to_header *t;
410
411 for (t = grants_headers_def; t->header; t++) {
412 if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
413 return;
414 }
415}
416
417static void add_grants_headers(map<int, string>& grants, map<string, string, ltstr_nocase>& attrs, map<string, string>& meta_map)
418{
419 struct grant_type_to_header *t;
420
421 for (t = grants_headers_def; t->header; t++) {
422 map<int, string>::iterator iter = grants.find(t->type);
423 if (iter != grants.end()) {
424 attrs[t->header] = iter->second;
425 meta_map[t->header] = iter->second;
426 }
427 }
428}
429
430int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
431{
432 string resource = obj.bucket.name + "/" + obj.get_oid();
433 string new_url = url;
434 if (new_url[new_url.size() - 1] != '/')
435 new_url.append("/");
436
437 string date_str;
438 get_new_date_str(date_str);
439
440 RGWEnv new_env;
441 req_info new_info(cct, &new_env);
442
443 string params_str;
444 map<string, string>& args = new_info.args.get_params();
445 get_params_str(args, params_str);
446
447 new_url.append(resource + params_str);
448
449 new_env.set("HTTP_DATE", date_str.c_str());
450
451 new_info.method = "PUT";
452
453 new_info.script_uri = "/";
454 new_info.script_uri.append(resource);
455 new_info.request_uri = new_info.script_uri;
456
457 map<string, string, ltstr_nocase>& m = new_env.get_map();
458 map<string, bufferlist>::iterator bliter;
459
460 /* merge send headers */
461 for (bliter = attrs.begin(); bliter != attrs.end(); ++bliter) {
462 bufferlist& bl = bliter->second;
463 const string& name = bliter->first;
464 string val = bl.c_str();
465 if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
466 string header_name = RGW_AMZ_META_PREFIX;
467 header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
468 m[header_name] = val;
469 new_info.x_meta_map[header_name] = val;
470 }
471 }
472 RGWAccessControlPolicy policy;
473 int ret = rgw_policy_from_attrset(cct, attrs, &policy);
474 if (ret < 0) {
475 ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
476 return ret;
477 }
478
479 /* update acl headers */
480 RGWAccessControlList& acl = policy.get_acl();
481 multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
482 multimap<string, ACLGrant>::iterator giter;
483 map<int, string> grants_by_type;
484 for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
485 ACLGrant& grant = giter->second;
486 ACLPermission& perm = grant.get_permission();
487 grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
488 }
489 add_grants_headers(grants_by_type, m, new_info.x_meta_map);
490 ret = sign_request(key, new_env, new_info);
491 if (ret < 0) {
492 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
493 return ret;
494 }
495
496 map<string, string>::iterator iter;
497 for (iter = m.begin(); iter != m.end(); ++iter) {
498 headers.push_back(pair<string, string>(iter->first, iter->second));
499 }
500
501 cb = new RGWRESTStreamOutCB(this);
502
503 set_send_length(obj_size);
504
505 int r = http_manager.add_request(this, new_info.method, new_url.c_str());
506 if (r < 0)
507 return r;
508
509 return 0;
510}
511
512int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
513{
514 uint64_t sent = 0;
515
516 dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
517 lock.Lock();
518 if (pending_send.empty() || status < 0) {
519 lock.Unlock();
520 return status;
521 }
522
523 list<bufferlist>::iterator iter = pending_send.begin();
524 while (iter != pending_send.end() && len > 0) {
525 bufferlist& bl = *iter;
526
527 list<bufferlist>::iterator next_iter = iter;
528 ++next_iter;
529 lock.Unlock();
530
531 uint64_t send_len = min(len, (size_t)bl.length());
532
533 memcpy(ptr, bl.c_str(), send_len);
534
535 ptr = (char *)ptr + send_len;
536 len -= send_len;
537 sent += send_len;
538
539 lock.Lock();
540
541 bufferlist new_bl;
542 if (bl.length() > send_len) {
543 bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
544 new_bl.append(bp);
545 }
546 pending_send.pop_front(); /* need to do this after we copy data from bl */
547 if (new_bl.length()) {
548 pending_send.push_front(new_bl);
549 }
550 iter = next_iter;
551 }
552 lock.Unlock();
553
554 return sent;
555}
556
557
558void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
559{
560 map<string, string>::iterator iter = out_headers.find(header_name);
561 if (iter != out_headers.end()) {
562 str = iter->second;
563 } else {
564 str.clear();
565 }
566}
567
568static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
569{
570 string err;
571 vector<string> vec;
572
573 get_str_vec(s, ".", vec);
574
575 if (vec.empty()) {
576 return -EINVAL;
577 }
578
579 long secs = strict_strtol(vec[0].c_str(), 10, &err);
580 long nsecs = 0;
581 if (!err.empty()) {
582 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
583 return -EINVAL;
584 }
585
586 if (vec.size() > 1) {
587 nsecs = strict_strtol(vec[1].c_str(), 10, &err);
588 if (!err.empty()) {
589 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
590 return -EINVAL;
591 }
592 }
593
594 *rt = utime_t(secs, nsecs).to_real_time();
595
596 return 0;
597}
598
599int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
600{
601 int ret = http_manager.complete_requests();
602 if (ret < 0)
603 return ret;
604
605 set_str_from_headers(out_headers, "ETAG", etag);
606
607 if (mtime) {
608 string mtime_str;
609 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
610
611 ret = parse_rgwx_mtime(cct, mtime_str, mtime);
612 if (ret < 0) {
613 return ret;
614 }
615 }
616 return status;
617}
618
619int RGWRESTStreamRWRequest::get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
620{
621 string urlsafe_bucket, urlsafe_object;
622 url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
623 url_encode(obj.key.name, urlsafe_object);
624 string resource = urlsafe_bucket + "/" + urlsafe_object;
625
626 return get_resource(key, extra_headers, resource);
627}
628
629int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr)
630{
631 string new_url = url;
632 if (new_url[new_url.size() - 1] != '/')
633 new_url.append("/");
634
635 string date_str;
636 get_new_date_str(date_str);
637
638 RGWEnv new_env;
639 req_info new_info(cct, &new_env);
640
641 string params_str;
642 map<string, string>& args = new_info.args.get_params();
643 get_params_str(args, params_str);
644
645 /* merge params with extra args so that we can sign correctly */
646 for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
647 new_info.args.append(iter->first, iter->second);
648 }
649
650 string new_resource;
651 if (resource[0] == '/') {
652 new_resource = resource.substr(1);
653 } else {
654 new_resource = resource;
655 }
656
657 new_url.append(new_resource + params_str);
658
659 new_env.set("HTTP_DATE", date_str.c_str());
660
661 for (map<string, string>::iterator iter = extra_headers.begin();
662 iter != extra_headers.end(); ++iter) {
663 new_env.set(iter->first.c_str(), iter->second.c_str());
664 }
665
666 new_info.method = method;
667
668 new_info.script_uri = "/";
669 new_info.script_uri.append(new_resource);
670 new_info.request_uri = new_info.script_uri;
671
672 new_info.init_meta_info(NULL);
673
674 int ret = sign_request(key, new_env, new_info);
675 if (ret < 0) {
676 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
677 return ret;
678 }
679
680 map<string, string, ltstr_nocase>& m = new_env.get_map();
681 map<string, string>::iterator iter;
682 for (iter = m.begin(); iter != m.end(); ++iter) {
683 headers.push_back(pair<string, string>(iter->first, iter->second));
684 }
685
686 RGWHTTPManager *pmanager = &http_manager;
687 if (mgr) {
688 pmanager = mgr;
689 }
690
691 int r = pmanager->add_request(this, new_info.method, new_url.c_str());
692 if (r < 0)
693 return r;
694
695 if (!mgr) {
696 r = pmanager->complete_requests();
697 if (r < 0)
698 return r;
699 }
700
701 return 0;
702}
703
704int RGWRESTStreamRWRequest::complete(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
705{
706 set_str_from_headers(out_headers, "ETAG", etag);
707 if (status >= 0) {
708 if (mtime) {
709 string mtime_str;
710 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
711 if (!mtime_str.empty()) {
712 int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
713 if (ret < 0) {
714 return ret;
715 }
716 } else {
717 *mtime = real_time();
718 }
719 }
720 if (psize) {
721 string size_str;
722 set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
723 string err;
724 *psize = strict_strtoll(size_str.c_str(), 10, &err);
725 if (!err.empty()) {
726 ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
727 return -EIO;
728 }
729 }
730 }
731
732 map<string, string>::iterator iter;
733 for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
734 const string& attr_name = iter->first;
735 if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
736 string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
737 const char *src = name.c_str();
738 char buf[name.size() + 1];
739 char *dest = buf;
740 for (; *src; ++src, ++dest) {
741 switch(*src) {
742 case '_':
743 *dest = '-';
744 break;
745 default:
746 *dest = tolower(*src);
747 }
748 }
749 *dest = '\0';
750 attrs[buf] = iter->second;
751 }
752 }
753 return status;
754}
755
756int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
757{
758 if (name == "RGWX_EMBEDDED_METADATA_LEN") {
759 string err;
760 long len = strict_strtol(val.c_str(), 10, &err);
761 if (!err.empty()) {
762 ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
763 return -EINVAL;
764 }
765
766 cb->set_extra_data_len(len);
767 }
768 return 0;
769}
770
771int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
772{
773 bufferptr bp((const char *)ptr, len);
774 bufferlist bl;
775 bl.append(bp);
776 int ret = cb->handle_data(bl, ofs, len);
777 if (ret < 0)
778 return ret;
779 ofs += len;
780 return len;
781}
782
783int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
784{
785 if (outbl.length() == 0) {
786 return 0;
787 }
788
789 uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
790 if (send_size > 0) {
791 memcpy(ptr, outbl.c_str() + write_ofs, send_size);
792 write_ofs += send_size;
793 }
794 return send_size;
795}
796
797class StreamIntoBufferlist : public RGWGetDataCB {
798 bufferlist& bl;
799public:
800 StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
801 int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
802 bl.claim_append(inbl);
803 return bl_len;
804 }
805};
806