]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_client.cc
fb61f326bd2f87d33d18b8972ebacac4b124fed7
[ceph.git] / ceph / src / rgw / rgw_rest_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_common.h"
5 #include "rgw_rest_client.h"
6 #include "rgw_auth_s3.h"
7 #include "rgw_http_errors.h"
8 #include "rgw_rados.h"
9
10 #include "common/ceph_crypto_cms.h"
11 #include "common/armor.h"
12 #include "common/strtol.h"
13 #include "include/str_list.h"
14 #include "rgw_crypt_sanitize.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rgw
18
19 int RGWRESTSimpleRequest::get_status()
20 {
21 int retcode = get_req_retcode();
22 if (retcode < 0) {
23 return retcode;
24 }
25 return status;
26 }
27
28 int RGWRESTSimpleRequest::handle_header(const string& name, const string& val)
29 {
30 if (name == "CONTENT_LENGTH") {
31 string err;
32 long len = strict_strtol(val.c_str(), 10, &err);
33 if (!err.empty()) {
34 ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
35 return -EINVAL;
36 }
37
38 max_response = len;
39 }
40
41 return 0;
42 }
43
44 int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
45 {
46 char line[len + 1];
47
48 char *s = (char *)ptr, *end = (char *)ptr + len;
49 char *p = line;
50 ldout(cct, 10) << "receive_http_header" << dendl;
51
52 while (s != end) {
53 if (*s == '\r') {
54 s++;
55 continue;
56 }
57 if (*s == '\n') {
58 *p = '\0';
59 ldout(cct, 10) << "received header:" << line << dendl;
60 // TODO: fill whatever data required here
61 char *l = line;
62 char *tok = strsep(&l, " \t:");
63 if (tok && l) {
64 while (*l == ' ')
65 l++;
66
67 if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
68 http_status = atoi(l);
69 if (http_status == 100) /* 100-continue response */
70 continue;
71 status = rgw_http_error_to_errno(http_status);
72 } else {
73 /* convert header field name to upper case */
74 char *src = tok;
75 char buf[len + 1];
76 size_t i;
77 for (i = 0; i < len && *src; ++i, ++src) {
78 switch (*src) {
79 case '-':
80 buf[i] = '_';
81 break;
82 default:
83 buf[i] = toupper(*src);
84 }
85 }
86 buf[i] = '\0';
87 out_headers[buf] = l;
88 int r = handle_header(buf, l);
89 if (r < 0)
90 return r;
91 }
92 }
93 }
94 if (s != end)
95 *p++ = *s++;
96 }
97 return 0;
98 }
99
100 static void get_new_date_str(string& date_str)
101 {
102 date_str = rgw_to_asctime(ceph_clock_now());
103 }
104
105 int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
106 {
107 string new_url = url;
108 string new_resource = resource;
109
110 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
111 new_url = new_url.substr(0, new_url.size() - 1);
112 } else if (resource[0] != '/') {
113 new_resource = "/";
114 new_resource.append(resource);
115 }
116 new_url.append(new_resource);
117
118 string date_str;
119 get_new_date_str(date_str);
120 headers.push_back(pair<string, string>("HTTP_DATE", date_str));
121
122 string canonical_header;
123 map<string, string> meta_map;
124 map<string, string> sub_resources;
125 rgw_create_s3_canonical_header(method, NULL, NULL, date_str.c_str(),
126 meta_map, new_url.c_str(), sub_resources,
127 canonical_header);
128
129 string digest;
130 try {
131 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
132 } catch (int ret) {
133 return ret;
134 }
135
136 string auth_hdr = "AWS " + key.id + ":" + digest;
137
138 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
139
140 headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
141 int r = process(method, new_url.c_str());
142 if (r < 0)
143 return r;
144
145 return status;
146 }
147
148 int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
149 {
150 if (!send_iter)
151 return 0;
152
153 if (len > send_iter->get_remaining())
154 len = send_iter->get_remaining();
155
156 send_iter->copy(len, (char *)ptr);
157
158 return len;
159 }
160
161 int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
162 {
163 size_t cp_len, left_len;
164
165 left_len = max_response > response.length() ? (max_response - response.length()) : 0;
166 if (left_len == 0)
167 return 0; /* don't read extra data */
168
169 cp_len = (len > left_len) ? left_len : len;
170 bufferptr p((char *)ptr, cp_len);
171
172 response.append(p);
173
174 return 0;
175
176 }
177
178 void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
179 {
180 if (dest.empty()) {
181 dest.append("?");
182 } else {
183 dest.append("&");
184 }
185 string url_name;
186 url_encode(name, url_name);
187 dest.append(url_name);
188
189 if (!val.empty()) {
190 string url_val;
191 url_encode(val, url_val);
192 dest.append("=");
193 dest.append(url_val);
194 }
195 }
196
197 void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
198 {
199 map<string, string>::iterator miter;
200 for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
201 append_param(dest, miter->first, miter->second);
202 }
203 param_vec_t::iterator iter;
204 for (iter = params.begin(); iter != params.end(); ++iter) {
205 append_param(dest, iter->first, iter->second);
206 }
207 }
208
209 int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
210 {
211 /* don't sign if no key is provided */
212 if (key.key.empty()) {
213 return 0;
214 }
215
216 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
217 for (const auto& i: env.get_map()) {
218 ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
219 }
220 }
221
222 string canonical_header;
223 if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
224 ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
225 return -EINVAL;
226 }
227
228 ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
229
230 string digest;
231 try {
232 digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
233 } catch (int ret) {
234 return ret;
235 }
236
237 string auth_hdr = "AWS " + key.id + ":" + digest;
238 ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
239
240 env.set("AUTHORIZATION", auth_hdr);
241
242 return 0;
243 }
244
245 int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
246 {
247
248 string date_str;
249 get_new_date_str(date_str);
250
251 RGWEnv new_env;
252 req_info new_info(cct, &new_env);
253 new_info.rebuild_from(info);
254
255 new_env.set("HTTP_DATE", date_str.c_str());
256
257 int ret = sign_request(key, new_env, new_info);
258 if (ret < 0) {
259 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
260 return ret;
261 }
262
263 for (const auto& kv: new_env.get_map()) {
264 headers.emplace_back(kv);
265 }
266
267 map<string, string>& meta_map = new_info.x_meta_map;
268 for (const auto& kv: meta_map) {
269 headers.emplace_back(kv);
270 }
271
272 string params_str;
273 get_params_str(info.args.get_params(), params_str);
274
275 string new_url = url;
276 string& resource = new_info.request_uri;
277 string new_resource = resource;
278 if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
279 new_url = new_url.substr(0, new_url.size() - 1);
280 } else if (resource[0] != '/') {
281 new_resource = "/";
282 new_resource.append(resource);
283 }
284 new_url.append(new_resource + params_str);
285
286 bufferlist::iterator bliter;
287
288 if (inbl) {
289 bliter = inbl->begin();
290 send_iter = &bliter;
291
292 set_send_length(inbl->length());
293 }
294
295 int r = process(new_info.method, new_url.c_str());
296 if (r < 0){
297 if (r == -EINVAL){
298 // curl_easy has errored, generally means the service is not available
299 r = -ERR_SERVICE_UNAVAILABLE;
300 }
301 return r;
302 }
303
304 response.append((char)0); /* NULL terminate response */
305
306 if (outbl) {
307 outbl->claim(response);
308 }
309
310 return status;
311 }
312
313 class RGWRESTStreamOutCB : public RGWGetDataCB {
314 RGWRESTStreamWriteRequest *req;
315 public:
316 explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
317 int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
318 };
319
320 int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
321 {
322 dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
323 if (!bl_ofs && bl_len == bl.length()) {
324 return req->add_output_data(bl);
325 }
326
327 bufferptr bp(bl.c_str() + bl_ofs, bl_len);
328 bufferlist new_bl;
329 new_bl.push_back(bp);
330
331 return req->add_output_data(new_bl);
332 }
333
334 RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
335 {
336 delete cb;
337 }
338
339 int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
340 {
341 lock.Lock();
342 if (status < 0) {
343 int ret = status;
344 lock.Unlock();
345 return ret;
346 }
347 pending_send.push_back(bl);
348 lock.Unlock();
349
350 bool done;
351 return http_manager.process_requests(false, &done);
352 }
353
354 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
355 {
356 string& s = grants_by_type[perm];
357
358 if (!s.empty())
359 s.append(", ");
360
361 string id_type_str;
362 ACLGranteeType& type = grant.get_type();
363 switch (type.get_type()) {
364 case ACL_TYPE_GROUP:
365 id_type_str = "uri";
366 break;
367 case ACL_TYPE_EMAIL_USER:
368 id_type_str = "emailAddress";
369 break;
370 default:
371 id_type_str = "id";
372 }
373 rgw_user id;
374 grant.get_id(id);
375 s.append(id_type_str + "=\"" + id.to_str() + "\"");
376 }
377
378 struct grant_type_to_header {
379 int type;
380 const char *header;
381 };
382
383 struct grant_type_to_header grants_headers_def[] = {
384 { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
385 { RGW_PERM_READ, "x-amz-grant-read"},
386 { RGW_PERM_WRITE, "x-amz-grant-write"},
387 { RGW_PERM_READ_ACP, "x-amz-grant-read-acp"},
388 { RGW_PERM_WRITE_ACP, "x-amz-grant-write-acp"},
389 { 0, NULL}
390 };
391
392 static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
393 {
394 if ((perm & check_perm) == perm) {
395 grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
396 return true;
397 }
398 return false;
399 }
400
401 static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
402 {
403 struct grant_type_to_header *t;
404
405 for (t = grants_headers_def; t->header; t++) {
406 if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
407 return;
408 }
409 }
410
411 static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
412 {
413 struct grant_type_to_header *t;
414
415 for (t = grants_headers_def; t->header; t++) {
416 map<int, string>::iterator iter = grants.find(t->type);
417 if (iter != grants.end()) {
418 env.set(t->header,iter->second);
419 meta_map[t->header] = iter->second;
420 }
421 }
422 }
423
424 int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
425 {
426 string resource = obj.bucket.name + "/" + obj.get_oid();
427 string new_url = url;
428 if (new_url[new_url.size() - 1] != '/')
429 new_url.append("/");
430
431 string date_str;
432 get_new_date_str(date_str);
433
434 RGWEnv new_env;
435 req_info new_info(cct, &new_env);
436
437 string params_str;
438 map<string, string>& args = new_info.args.get_params();
439 get_params_str(args, params_str);
440
441 new_url.append(resource + params_str);
442
443 new_env.set("HTTP_DATE", date_str.c_str());
444
445 new_info.method = "PUT";
446
447 new_info.script_uri = "/";
448 new_info.script_uri.append(resource);
449 new_info.request_uri = new_info.script_uri;
450
451 /* merge send headers */
452 for (auto& attr: attrs) {
453 bufferlist& bl = attr.second;
454 const string& name = attr.first;
455 string val = bl.c_str();
456 if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
457 string header_name = RGW_AMZ_META_PREFIX;
458 header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
459 new_env.set(header_name, val);
460 new_info.x_meta_map[header_name] = val;
461 }
462 }
463 RGWAccessControlPolicy policy;
464 int ret = rgw_policy_from_attrset(cct, attrs, &policy);
465 if (ret < 0) {
466 ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
467 return ret;
468 }
469
470 /* update acl headers */
471 RGWAccessControlList& acl = policy.get_acl();
472 multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
473 multimap<string, ACLGrant>::iterator giter;
474 map<int, string> grants_by_type;
475 for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
476 ACLGrant& grant = giter->second;
477 ACLPermission& perm = grant.get_permission();
478 grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
479 }
480 add_grants_headers(grants_by_type, new_env, new_info.x_meta_map);
481 ret = sign_request(key, new_env, new_info);
482 if (ret < 0) {
483 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
484 return ret;
485 }
486
487 for (const auto& kv: new_env.get_map()) {
488 headers.emplace_back(kv);
489 }
490
491 cb = new RGWRESTStreamOutCB(this);
492
493 set_send_length(obj_size);
494
495 int r = http_manager.add_request(this, new_info.method, new_url.c_str());
496 if (r < 0)
497 return r;
498
499 return 0;
500 }
501
502 int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
503 {
504 uint64_t sent = 0;
505
506 dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
507 lock.Lock();
508 if (pending_send.empty() || status < 0) {
509 lock.Unlock();
510 return status;
511 }
512
513 list<bufferlist>::iterator iter = pending_send.begin();
514 while (iter != pending_send.end() && len > 0) {
515 bufferlist& bl = *iter;
516
517 list<bufferlist>::iterator next_iter = iter;
518 ++next_iter;
519 lock.Unlock();
520
521 uint64_t send_len = min(len, (size_t)bl.length());
522
523 memcpy(ptr, bl.c_str(), send_len);
524
525 ptr = (char *)ptr + send_len;
526 len -= send_len;
527 sent += send_len;
528
529 lock.Lock();
530
531 bufferlist new_bl;
532 if (bl.length() > send_len) {
533 bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
534 new_bl.append(bp);
535 }
536 pending_send.pop_front(); /* need to do this after we copy data from bl */
537 if (new_bl.length()) {
538 pending_send.push_front(new_bl);
539 }
540 iter = next_iter;
541 }
542 lock.Unlock();
543
544 return sent;
545 }
546
547
548 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
549 {
550 map<string, string>::iterator iter = out_headers.find(header_name);
551 if (iter != out_headers.end()) {
552 str = iter->second;
553 } else {
554 str.clear();
555 }
556 }
557
558 static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
559 {
560 string err;
561 vector<string> vec;
562
563 get_str_vec(s, ".", vec);
564
565 if (vec.empty()) {
566 return -EINVAL;
567 }
568
569 long secs = strict_strtol(vec[0].c_str(), 10, &err);
570 long nsecs = 0;
571 if (!err.empty()) {
572 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
573 return -EINVAL;
574 }
575
576 if (vec.size() > 1) {
577 nsecs = strict_strtol(vec[1].c_str(), 10, &err);
578 if (!err.empty()) {
579 ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
580 return -EINVAL;
581 }
582 }
583
584 *rt = utime_t(secs, nsecs).to_real_time();
585
586 return 0;
587 }
588
589 int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
590 {
591 int ret = http_manager.complete_requests();
592 if (ret < 0)
593 return ret;
594
595 set_str_from_headers(out_headers, "ETAG", etag);
596
597 if (mtime) {
598 string mtime_str;
599 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
600
601 ret = parse_rgwx_mtime(cct, mtime_str, mtime);
602 if (ret < 0) {
603 return ret;
604 }
605 }
606 return status;
607 }
608
609 int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
610 {
611 string urlsafe_bucket, urlsafe_object;
612 url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
613 url_encode(obj.key.name, urlsafe_object);
614 string resource = urlsafe_bucket + "/" + urlsafe_object;
615
616 return send_request(&key, extra_headers, resource, nullptr, mgr);
617 }
618
619 int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
620 bufferlist *send_data, RGWHTTPManager *mgr)
621 {
622 string new_url = url;
623 if (new_url[new_url.size() - 1] != '/')
624 new_url.append("/");
625
626 string date_str;
627 get_new_date_str(date_str);
628
629 RGWEnv new_env;
630 req_info new_info(cct, &new_env);
631
632 string params_str;
633 map<string, string>& args = new_info.args.get_params();
634 get_params_str(args, params_str);
635
636 /* merge params with extra args so that we can sign correctly */
637 for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
638 new_info.args.append(iter->first, iter->second);
639 }
640
641 string new_resource;
642 if (resource[0] == '/') {
643 new_resource = resource.substr(1);
644 } else {
645 new_resource = resource;
646 }
647
648 new_url.append(new_resource + params_str);
649
650 new_env.set("HTTP_DATE", date_str.c_str());
651
652 for (map<string, string>::iterator iter = extra_headers.begin();
653 iter != extra_headers.end(); ++iter) {
654 new_env.set(iter->first.c_str(), iter->second.c_str());
655 }
656
657 new_info.method = method;
658
659 new_info.script_uri = "/";
660 new_info.script_uri.append(new_resource);
661 new_info.request_uri = new_info.script_uri;
662
663 new_info.init_meta_info(NULL);
664
665 if (key) {
666 int ret = sign_request(*key, new_env, new_info);
667 if (ret < 0) {
668 ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
669 return ret;
670 }
671 }
672
673 for (const auto& kv: new_env.get_map()) {
674 headers.emplace_back(kv);
675 }
676
677 bool send_data_hint = false;
678 if (send_data) {
679 outbl.claim(*send_data);
680 send_data_hint = true;
681 }
682
683 RGWHTTPManager *pmanager = &http_manager;
684 if (mgr) {
685 pmanager = mgr;
686 }
687
688 int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
689 if (r < 0)
690 return r;
691
692 if (!mgr) {
693 r = pmanager->complete_requests();
694 if (r < 0)
695 return r;
696 }
697
698 return 0;
699 }
700
701 int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
702 {
703 set_str_from_headers(out_headers, "ETAG", etag);
704 if (status >= 0) {
705 if (mtime) {
706 string mtime_str;
707 set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
708 if (!mtime_str.empty()) {
709 int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
710 if (ret < 0) {
711 return ret;
712 }
713 } else {
714 *mtime = real_time();
715 }
716 }
717 if (psize) {
718 string size_str;
719 set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
720 string err;
721 *psize = strict_strtoll(size_str.c_str(), 10, &err);
722 if (!err.empty()) {
723 ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
724 return -EIO;
725 }
726 }
727 }
728
729 map<string, string>::iterator iter;
730 for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
731 const string& attr_name = iter->first;
732 if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
733 string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
734 const char *src = name.c_str();
735 char buf[name.size() + 1];
736 char *dest = buf;
737 for (; *src; ++src, ++dest) {
738 switch(*src) {
739 case '_':
740 *dest = '-';
741 break;
742 default:
743 *dest = tolower(*src);
744 }
745 }
746 *dest = '\0';
747 attrs[buf] = iter->second;
748 }
749 }
750 return status;
751 }
752
753 int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
754 {
755 if (name == "RGWX_EMBEDDED_METADATA_LEN") {
756 string err;
757 long len = strict_strtol(val.c_str(), 10, &err);
758 if (!err.empty()) {
759 ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
760 return -EINVAL;
761 }
762
763 cb->set_extra_data_len(len);
764 }
765 return 0;
766 }
767
768 int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
769 {
770 bufferptr bp((const char *)ptr, len);
771 bufferlist bl;
772 bl.append(bp);
773 int ret = cb->handle_data(bl, ofs, len);
774 if (ret < 0)
775 return ret;
776 ofs += len;
777 return len;
778 }
779
780 int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
781 {
782 if (outbl.length() == 0) {
783 return 0;
784 }
785
786 uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
787 if (send_size > 0) {
788 memcpy(ptr, outbl.c_str() + write_ofs, send_size);
789 write_ofs += send_size;
790 }
791 return send_size;
792 }
793
794 class StreamIntoBufferlist : public RGWGetDataCB {
795 bufferlist& bl;
796 public:
797 StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
798 int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
799 bl.claim_append(inbl);
800 return bl_len;
801 }
802 };
803