]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_client.cc
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_common.h"
5 #include "rgw_rest_client.h"
6 #include "rgw_auth_s3.h"
7 #include "rgw_http_errors.h"
8 #include "rgw_rados.h"
9
10 #include "common/ceph_crypto_cms.h"
11 #include "common/armor.h"
12 #include "common/strtol.h"
13 #include "include/str_list.h"
14 #include "rgw_crypt_sanitize.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rgw
18
19 int RGWRESTSimpleRequest::get_status()
20 {
21 int retcode = get_req_retcode();
22 if (retcode < 0) {
23 return retcode;
24 }
25 return status;
26 }
27
28 int RGWRESTSimpleRequest::handle_header(const string& name, const string& val)
29 {
30 if (name == "CONTENT_LENGTH") {
31 string err;
32 long len = strict_strtol(val.c_str(), 10, &err);
33 if (!err.empty()) {
34 ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
35 return -EINVAL;
36 }
37
38 max_response = len;
39 }
40
41 return 0;
42 }
43
44 int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
45 {
46 char line[len + 1];
47
48 char *s = (char *)ptr, *end = (char *)ptr + len;
49 char *p = line;
50 ldout(cct, 10) << "receive_http_header" << dendl;
51
52 while (s != end) {
53 if (*s == '\r') {
54 s++;
55 continue;
56 }
57 if (*s == '\n') {
58 *p = '\0';
59 ldout(cct, 10) << "received header:" << line << dendl;
60 // TODO: fill whatever data required here
61 char *l = line;
62 char *tok = strsep(&l, " \t:");
63 if (tok && l) {
64 while (*l == ' ')
65 l++;
66
67 if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
68 http_status = atoi(l);
69 if (http_status == 100) /* 100-continue response */
70 continue;
71 status = rgw_http_error_to_errno(http_status);
72 } else {
73 /* convert header field name to upper case */
74 char *src = tok;
75 char buf[len + 1];
76 size_t i;
77 for (i = 0; i < len && *src; ++i, ++src) {
78 switch (*src) {
79 case '-':
80 buf[i] = '_';
81 break;
82 default:
83 buf[i] = toupper(*src);
84 }
85 }
86 buf[i] = '\0';
87 out_headers[buf] = l;
88 int r = handle_header(buf, l);
89 if (r < 0)
90 return r;
91 }
92 }
93 }
94 if (s != end)
95 *p++ = *s++;
96 }
97 return 0;
98 }
99
100 static void get_new_date_str(string& date_str)
101 {
102 utime_t tm = ceph_clock_now();
103 stringstream s;
104 tm.asctime(s);
105 date_str = s.str();
106 }
107
108 int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
109 {
110 string new_url = url;
111 string new_resource = resource;
112
113 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
114 new_url = new_url.substr(0, new_url.size() - 1);
115 } else if (resource[0] != '/') {
116 new_resource = "/";
117 new_resource.append(resource);
118 }
119 new_url.append(new_resource);
120
121 string date_str;
122 get_new_date_str(date_str);
123 headers.push_back(pair<string, string>("HTTP_DATE", date_str));
124
125 string canonical_header;
126 map<string, string> meta_map;
127 map<string, string> sub_resources;
128 rgw_create_s3_canonical_header(method, NULL, NULL, date_str.c_str(),
129 meta_map, new_url.c_str(), sub_resources,
130 canonical_header);
131
132 string digest;
133 try {
134 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
135 } catch (int ret) {
136 return ret;
137 }
138
139 string auth_hdr = "AWS " + key.id + ":" + digest;
140
141 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
142
143 headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
144 int r = process(method, new_url.c_str());
145 if (r < 0)
146 return r;
147
148 return status;
149 }
150
151 int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
152 {
153 if (!send_iter)
154 return 0;
155
156 if (len > send_iter->get_remaining())
157 len = send_iter->get_remaining();
158
159 send_iter->copy(len, (char *)ptr);
160
161 return len;
162 }
163
164 int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
165 {
166 size_t cp_len, left_len;
167
168 left_len = max_response > response.length() ? (max_response - response.length()) : 0;
169 if (left_len == 0)
170 return 0; /* don't read extra data */
171
172 cp_len = (len > left_len) ? left_len : len;
173 bufferptr p((char *)ptr, cp_len);
174
175 response.append(p);
176
177 return 0;
178
179 }
180
181 void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
182 {
183 if (dest.empty()) {
184 dest.append("?");
185 } else {
186 dest.append("&");
187 }
188 string url_name;
189 url_encode(name, url_name);
190 dest.append(url_name);
191
192 if (!val.empty()) {
193 string url_val;
194 url_encode(val, url_val);
195 dest.append("=");
196 dest.append(url_val);
197 }
198 }
199
200 void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
201 {
202 map<string, string>::iterator miter;
203 for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
204 append_param(dest, miter->first, miter->second);
205 }
206 param_vec_t::iterator iter;
207 for (iter = params.begin(); iter != params.end(); ++iter) {
208 append_param(dest, iter->first, iter->second);
209 }
210 }
211
212 int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
213 {
214 /* don't sign if no key is provided */
215 if (key.key.empty()) {
216 return 0;
217 }
218
219 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
220 for (const auto& i: env.get_map()) {
221 ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
222 }
223 }
224
225 string canonical_header;
226 if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
227 ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
228 return -EINVAL;
229 }
230
231 ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
232
233 string digest;
234 try {
235 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
236 } catch (int ret) {
237 return ret;
238 }
239
240 string auth_hdr = "AWS " + key.id + ":" + digest;
241 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
242
243 env.set("AUTHORIZATION", auth_hdr);
244
245 return 0;
246 }
247
248 int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
249 {
250
251 string date_str;
252 get_new_date_str(date_str);
253
254 RGWEnv new_env;
255 req_info new_info(cct, &new_env);
256 new_info.rebuild_from(info);
257
258 new_env.set("HTTP_DATE", date_str.c_str());
259
260 int ret = sign_request(key, new_env, new_info);
261 if (ret < 0) {
262 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
263 return ret;
264 }
265
266 for (const auto& kv: new_env.get_map()) {
267 headers.emplace_back(kv);
268 }
269
270 map<string, string>& meta_map = new_info.x_meta_map;
271 for (const auto& kv: meta_map) {
272 headers.emplace_back(kv);
273 }
274
275 string params_str;
276 get_params_str(info.args.get_params(), params_str);
277
278 string new_url = url;
279 string& resource = new_info.request_uri;
280 string new_resource = resource;
281 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
282 new_url = new_url.substr(0, new_url.size() - 1);
283 } else if (resource[0] != '/') {
284 new_resource = "/";
285 new_resource.append(resource);
286 }
287 new_url.append(new_resource + params_str);
288
289 bufferlist::iterator bliter;
290
291 if (inbl) {
292 bliter = inbl->begin();
293 send_iter = &bliter;
294
295 set_send_length(inbl->length());
296 }
297
298 int r = process(new_info.method, new_url.c_str());
299 if (r < 0){
300 if (r == -EINVAL){
301 // curl_easy has errored, generally means the service is not available
302 r = -ERR_SERVICE_UNAVAILABLE;
303 }
304 return r;
305 }
306
307 response.append((char)0); /* NULL terminate response */
308
309 if (outbl) {
310 outbl->claim(response);
311 }
312
313 return status;
314 }
315
316 class RGWRESTStreamOutCB : public RGWGetDataCB {
317 RGWRESTStreamWriteRequest *req;
318 public:
319 explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
320 int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
321 };
322
323 int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
324 {
325 dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
326 if (!bl_ofs && bl_len == bl.length()) {
327 return req->add_output_data(bl);
328 }
329
330 bufferptr bp(bl.c_str() + bl_ofs, bl_len);
331 bufferlist new_bl;
332 new_bl.push_back(bp);
333
334 return req->add_output_data(new_bl);
335 }
336
337 RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
338 {
339 delete cb;
340 }
341
342 int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
343 {
344 lock.Lock();
345 if (status < 0) {
346 int ret = status;
347 lock.Unlock();
348 return ret;
349 }
350 pending_send.push_back(bl);
351 lock.Unlock();
352
353 bool done;
354 return http_manager.process_requests(false, &done);
355 }
356
357 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
358 {
359 string& s = grants_by_type[perm];
360
361 if (!s.empty())
362 s.append(", ");
363
364 string id_type_str;
365 ACLGranteeType& type = grant.get_type();
366 switch (type.get_type()) {
367 case ACL_TYPE_GROUP:
368 id_type_str = "uri";
369 break;
370 case ACL_TYPE_EMAIL_USER:
371 id_type_str = "emailAddress";
372 break;
373 default:
374 id_type_str = "id";
375 }
376 rgw_user id;
377 grant.get_id(id);
378 s.append(id_type_str + "=\"" + id.to_str() + "\"");
379 }
380
381 struct grant_type_to_header {
382 int type;
383 const char *header;
384 };
385
386 struct grant_type_to_header grants_headers_def[] = {
387 { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
388 { RGW_PERM_READ, "x-amz-grant-read"},
389 { RGW_PERM_WRITE, "x-amz-grant-write"},
390 { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"},
391 { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"},
392 { 0, NULL}
393 };
394
395 static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
396 {
397 if ((perm & check_perm) == perm) {
398 grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
399 return true;
400 }
401 return false;
402 }
403
404 static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
405 {
406 struct grant_type_to_header *t;
407
408 for (t = grants_headers_def; t->header; t++) {
409 if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
410 return;
411 }
412 }
413
414 static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
415 {
416 struct grant_type_to_header *t;
417
418 for (t = grants_headers_def; t->header; t++) {
419 map<int, string>::iterator iter = grants.find(t->type);
420 if (iter != grants.end()) {
421 env.set(t->header,iter->second);
422 meta_map[t->header] = iter->second;
423 }
424 }
425 }
426
427 int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
428 {
429 string resource = obj.bucket.name + "/" + obj.get_oid();
430 string new_url = url;
431 if (new_url[new_url.size() - 1] != '/')
432 new_url.append("/");
433
434 string date_str;
435 get_new_date_str(date_str);
436
437 RGWEnv new_env;
438 req_info new_info(cct, &new_env);
439
440 string params_str;
441 map<string, string>& args = new_info.args.get_params();
442 get_params_str(args, params_str);
443
444 new_url.append(resource + params_str);
445
446 new_env.set("HTTP_DATE", date_str.c_str());
447
448 new_info.method = "PUT";
449
450 new_info.script_uri = "/";
451 new_info.script_uri.append(resource);
452 new_info.request_uri = new_info.script_uri;
453
454 /* merge send headers */
455 for (auto& attr: attrs) {
456 bufferlist& bl = attr.second;
457 const string& name = attr.first;
458 string val = bl.c_str();
459 if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
460 string header_name = RGW_AMZ_META_PREFIX;
461 header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
462 new_env.set(header_name, val);
463 new_info.x_meta_map[header_name] = val;
464 }
465 }
466 RGWAccessControlPolicy policy;
467 int ret = rgw_policy_from_attrset(cct, attrs, &policy);
468 if (ret < 0) {
469 ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
470 return ret;
471 }
472
473 /* update acl headers */
474 RGWAccessControlList& acl = policy.get_acl();
475 multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
476 multimap<string, ACLGrant>::iterator giter;
477 map<int, string> grants_by_type;
478 for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
479 ACLGrant& grant = giter->second;
480 ACLPermission& perm = grant.get_permission();
481 grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
482 }
483 add_grants_headers(grants_by_type, new_env, new_info.x_meta_map);
484 ret = sign_request(key, new_env, new_info);
485 if (ret < 0) {
486 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
487 return ret;
488 }
489
490 for (const auto& kv: new_env.get_map()) {
491 headers.emplace_back(kv);
492 }
493
494 cb = new RGWRESTStreamOutCB(this);
495
496 set_send_length(obj_size);
497
498 int r = http_manager.add_request(this, new_info.method, new_url.c_str());
499 if (r < 0)
500 return r;
501
502 return 0;
503 }
504
505 int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
506 {
507 uint64_t sent = 0;
508
509 dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
510 lock.Lock();
511 if (pending_send.empty() || status < 0) {
512 lock.Unlock();
513 return status;
514 }
515
516 list<bufferlist>::iterator iter = pending_send.begin();
517 while (iter != pending_send.end() && len > 0) {
518 bufferlist& bl = *iter;
519
520 list<bufferlist>::iterator next_iter = iter;
521 ++next_iter;
522 lock.Unlock();
523
524 uint64_t send_len = min(len, (size_t)bl.length());
525
526 memcpy(ptr, bl.c_str(), send_len);
527
528 ptr = (char *)ptr + send_len;
529 len -= send_len;
530 sent += send_len;
531
532 lock.Lock();
533
534 bufferlist new_bl;
535 if (bl.length() > send_len) {
536 bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
537 new_bl.append(bp);
538 }
539 pending_send.pop_front(); /* need to do this after we copy data from bl */
540 if (new_bl.length()) {
541 pending_send.push_front(new_bl);
542 }
543 iter = next_iter;
544 }
545 lock.Unlock();
546
547 return sent;
548 }
549
550
551 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
552 {
553 map<string, string>::iterator iter = out_headers.find(header_name);
554 if (iter != out_headers.end()) {
555 str = iter->second;
556 } else {
557 str.clear();
558 }
559 }
560
561 static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
562 {
563 string err;
564 vector<string> vec;
565
566 get_str_vec(s, ".", vec);
567
568 if (vec.empty()) {
569 return -EINVAL;
570 }
571
572 long secs = strict_strtol(vec[0].c_str(), 10, &err);
573 long nsecs = 0;
574 if (!err.empty()) {
575 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
576 return -EINVAL;
577 }
578
579 if (vec.size() > 1) {
580 nsecs = strict_strtol(vec[1].c_str(), 10, &err);
581 if (!err.empty()) {
582 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
583 return -EINVAL;
584 }
585 }
586
587 *rt = utime_t(secs, nsecs).to_real_time();
588
589 return 0;
590 }
591
592 int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
593 {
594 int ret = http_manager.complete_requests();
595 if (ret < 0)
596 return ret;
597
598 set_str_from_headers(out_headers, "ETAG", etag);
599
600 if (mtime) {
601 string mtime_str;
602 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
603
604 ret = parse_rgwx_mtime(cct, mtime_str, mtime);
605 if (ret < 0) {
606 return ret;
607 }
608 }
609 return status;
610 }
611
612 int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
613 {
614 string urlsafe_bucket, urlsafe_object;
615 url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
616 url_encode(obj.key.name, urlsafe_object);
617 string resource = urlsafe_bucket + "/" + urlsafe_object;
618
619 return send_request(&key, extra_headers, resource, nullptr, mgr);
620 }
621
622 int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
623 bufferlist *send_data, RGWHTTPManager *mgr)
624 {
625 string new_url = url;
626 if (new_url[new_url.size() - 1] != '/')
627 new_url.append("/");
628
629 string date_str;
630 get_new_date_str(date_str);
631
632 RGWEnv new_env;
633 req_info new_info(cct, &new_env);
634
635 string params_str;
636 map<string, string>& args = new_info.args.get_params();
637 get_params_str(args, params_str);
638
639 /* merge params with extra args so that we can sign correctly */
640 for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
641 new_info.args.append(iter->first, iter->second);
642 }
643
644 string new_resource;
645 if (resource[0] == '/') {
646 new_resource = resource.substr(1);
647 } else {
648 new_resource = resource;
649 }
650
651 new_url.append(new_resource + params_str);
652
653 new_env.set("HTTP_DATE", date_str.c_str());
654
655 for (map<string, string>::iterator iter = extra_headers.begin();
656 iter != extra_headers.end(); ++iter) {
657 new_env.set(iter->first.c_str(), iter->second.c_str());
658 }
659
660 new_info.method = method;
661
662 new_info.script_uri = "/";
663 new_info.script_uri.append(new_resource);
664 new_info.request_uri = new_info.script_uri;
665
666 new_info.init_meta_info(NULL);
667
668 if (key) {
669 int ret = sign_request(*key, new_env, new_info);
670 if (ret < 0) {
671 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
672 return ret;
673 }
674 }
675
676 for (const auto& kv: new_env.get_map()) {
677 headers.emplace_back(kv);
678 }
679
680 bool send_data_hint = false;
681 if (send_data) {
682 outbl.claim(*send_data);
683 send_data_hint = true;
684 }
685
686 RGWHTTPManager *pmanager = &http_manager;
687 if (mgr) {
688 pmanager = mgr;
689 }
690
691 int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
692 if (r < 0)
693 return r;
694
695 if (!mgr) {
696 r = pmanager->complete_requests();
697 if (r < 0)
698 return r;
699 }
700
701 return 0;
702 }
703
704 int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
705 {
706 set_str_from_headers(out_headers, "ETAG", etag);
707 if (status >= 0) {
708 if (mtime) {
709 string mtime_str;
710 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
711 if (!mtime_str.empty()) {
712 int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
713 if (ret < 0) {
714 return ret;
715 }
716 } else {
717 *mtime = real_time();
718 }
719 }
720 if (psize) {
721 string size_str;
722 set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
723 string err;
724 *psize = strict_strtoll(size_str.c_str(), 10, &err);
725 if (!err.empty()) {
726 ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
727 return -EIO;
728 }
729 }
730 }
731
732 map<string, string>::iterator iter;
733 for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
734 const string& attr_name = iter->first;
735 if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
736 string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
737 const char *src = name.c_str();
738 char buf[name.size() + 1];
739 char *dest = buf;
740 for (; *src; ++src, ++dest) {
741 switch(*src) {
742 case '_':
743 *dest = '-';
744 break;
745 default:
746 *dest = tolower(*src);
747 }
748 }
749 *dest = '\0';
750 attrs[buf] = iter->second;
751 }
752 }
753 return status;
754 }
755
756 int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
757 {
758 if (name == "RGWX_EMBEDDED_METADATA_LEN") {
759 string err;
760 long len = strict_strtol(val.c_str(), 10, &err);
761 if (!err.empty()) {
762 ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
763 return -EINVAL;
764 }
765
766 cb->set_extra_data_len(len);
767 }
768 return 0;
769 }
770
771 int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
772 {
773 bufferptr bp((const char *)ptr, len);
774 bufferlist bl;
775 bl.append(bp);
776 int ret = cb->handle_data(bl, ofs, len);
777 if (ret < 0)
778 return ret;
779 ofs += len;
780 return len;
781 }
782
783 int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
784 {
785 if (outbl.length() == 0) {
786 return 0;
787 }
788
789 uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
790 if (send_size > 0) {
791 memcpy(ptr, outbl.c_str() + write_ofs, send_size);
792 write_ofs += send_size;
793 }
794 return send_size;
795 }
796
797 class StreamIntoBufferlist : public RGWGetDataCB {
798 bufferlist& bl;
799 public:
800 StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
801 int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
802 bl.claim_append(inbl);
803 return bl_len;
804 }
805 };
806