]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc_tier.cc
704b7dee2e2714fcc4d5d045c937f3eee2ace011
[ceph.git] / ceph / src / rgw / rgw_lc_tier.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string.h>
5 #include <iostream>
6 #include <map>
7
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "rgw_lc.h"
11 #include "rgw_lc_tier.h"
12 #include "rgw_string.h"
13 #include "rgw_zone.h"
14 #include "rgw_common.h"
15 #include "rgw_rest.h"
16 #include "svc_zone.h"
17
18 #include <boost/algorithm/string/split.hpp>
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/predicate.hpp>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace std;
26
27 struct rgw_lc_multipart_part_info {
28 int part_num{0};
29 uint64_t ofs{0};
30 uint64_t size{0};
31 std::string etag;
32 };
33
34 struct rgw_lc_obj_properties {
35 ceph::real_time mtime;
36 std::string etag;
37 uint64_t versioned_epoch{0};
38 std::map<std::string, RGWTierACLMapping>& target_acl_mappings;
39 std::string target_storage_class;
40
41 rgw_lc_obj_properties(ceph::real_time _mtime, std::string _etag,
42 uint64_t _versioned_epoch, std::map<std::string,
43 RGWTierACLMapping>& _t_acl_mappings,
44 std::string _t_storage_class) :
45 mtime(_mtime), etag(_etag),
46 versioned_epoch(_versioned_epoch),
47 target_acl_mappings(_t_acl_mappings),
48 target_storage_class(_t_storage_class) {}
49 };
50
51 struct rgw_lc_multipart_upload_info {
52 std::string upload_id;
53 uint64_t obj_size;
54 ceph::real_time mtime;
55 std::string etag;
56
57 void encode(bufferlist& bl) const {
58 ENCODE_START(1, 1, bl);
59 encode(upload_id, bl);
60 encode(obj_size, bl);
61 encode(mtime, bl);
62 encode(etag, bl);
63 ENCODE_FINISH(bl);
64 }
65
66 void decode(bufferlist::const_iterator& bl) {
67 DECODE_START(1, bl);
68 decode(upload_id, bl);
69 decode(obj_size, bl);
70 decode(mtime, bl);
71 decode(etag, bl);
72 DECODE_FINISH(bl);
73 }
74 };
75 WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info)
76
77 static inline string get_key_instance(const rgw_obj_key& key)
78 {
79 if (!key.instance.empty() &&
80 !key.have_null_instance()) {
81 return "-" + key.instance;
82 }
83 return "";
84 }
85
86 static inline string get_key_oid(const rgw_obj_key& key)
87 {
88 string oid = key.name;
89 if (!key.instance.empty() &&
90 !key.have_null_instance()) {
91 oid += string("-") + key.instance;
92 }
93 return oid;
94 }
95
96 static inline string obj_to_aws_path(const rgw_obj& obj)
97 {
98 string path = obj.bucket.name + "/" + get_key_oid(obj.key);
99 return path;
100 }
101
102 static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
103 const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
104 {
105 int ret = 0;
106 rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
107
108 if (!rados) {
109 ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
110 return -1;
111 }
112
113 auto& pool = status_obj->pool;
114 const auto oid = status_obj->oid;
115 auto obj_ctx = rados->svc()->sysobj->init_obj_ctx();
116 bufferlist bl;
117
118 ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, nullptr, nullptr,
119 null_yield, dpp);
120
121 if (ret < 0) {
122 return ret;
123 }
124
125 if (bl.length() > 0) {
126 try {
127 auto p = bl.cbegin();
128 status->decode(p);
129 } catch (buffer::error& e) {
130 ldpp_dout(dpp, 10) << "failed to decode status obj: "
131 << e.what() << dendl;
132 return -EIO;
133 }
134 } else {
135 return -EIO;
136 }
137
138 return 0;
139 }
140
141 static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
142 const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
143 {
144 int ret = 0;
145 rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
146
147 if (!rados) {
148 ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
149 return -1;
150 }
151
152 auto& pool = status_obj->pool;
153 const auto oid = status_obj->oid;
154 auto obj_ctx = rados->svc()->sysobj->init_obj_ctx();
155 bufferlist bl;
156 status->encode(bl);
157
158 ret = rgw_put_system_obj(dpp, obj_ctx, pool, oid, bl, true, nullptr,
159 real_time{}, null_yield);
160
161 return ret;
162 }
163
164 static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
165 const rgw_raw_obj *status_obj)
166 {
167 int ret = 0;
168 rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
169
170 if (!rados) {
171 ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
172 return -1;
173 }
174
175 auto& pool = status_obj->pool;
176 const auto oid = status_obj->oid;
177 auto sysobj = rados->svc()->sysobj;
178
179 ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield);
180
181 return ret;
182 }
183
184 static std::set<string> keep_headers = { "CONTENT_TYPE",
185 "CONTENT_ENCODING",
186 "CONTENT_DISPOSITION",
187 "CONTENT_LANGUAGE" };
188
189 /*
190 * mapping between rgw object attrs and output http fields
191 *
192 static const struct rgw_http_attr base_rgw_to_http_attrs[] = {
193 { RGW_ATTR_CONTENT_LANG, "Content-Language" },
194 { RGW_ATTR_EXPIRES, "Expires" },
195 { RGW_ATTR_CACHE_CONTROL, "Cache-Control" },
196 { RGW_ATTR_CONTENT_DISP, "Content-Disposition" },
197 { RGW_ATTR_CONTENT_ENC, "Content-Encoding" },
198 { RGW_ATTR_USER_MANIFEST, "X-Object-Manifest" },
199 { RGW_ATTR_X_ROBOTS_TAG , "X-Robots-Tag" },
200 { RGW_ATTR_STORAGE_CLASS , "X-Amz-Storage-Class" },
201 // RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION header depends on access mode:
202 // S3 endpoint: x-amz-website-redirect-location
203 // S3Website endpoint: Location
204 { RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION, "x-amz-website-redirect-location" },
205 }; */
206
207 static void init_headers(map<string, bufferlist>& attrs,
208 map<string, string>& headers)
209 {
210 for (auto& kv : attrs) {
211 const char * name = kv.first.c_str();
212 const auto aiter = rgw_to_http_attrs.find(name);
213
214 if (aiter != std::end(rgw_to_http_attrs)) {
215 headers[aiter->second] = rgw_bl_str(kv.second);
216 } else if (strncmp(name, RGW_ATTR_META_PREFIX,
217 sizeof(RGW_ATTR_META_PREFIX)-1) == 0) {
218 name += sizeof(RGW_ATTR_META_PREFIX) - 1;
219 string sname(name);
220 string name_prefix = RGW_ATTR_META_PREFIX;
221 char full_name_buf[name_prefix.size() + sname.size() + 1];
222 snprintf(full_name_buf, sizeof(full_name_buf), "%.*s%.*s",
223 static_cast<int>(name_prefix.length()),
224 name_prefix.data(),
225 static_cast<int>(sname.length()),
226 sname.data());
227 headers[full_name_buf] = rgw_bl_str(kv.second);
228 } else if (strcmp(name,RGW_ATTR_CONTENT_TYPE) == 0) {
229 headers["CONTENT_TYPE"] = rgw_bl_str(kv.second);
230 }
231 }
232 }
233
234 /* Read object or just head from remote endpoint. For now initializes only headers,
235 * but can be extended to fetch etag, mtime etc if needed.
236 */
237 static int cloud_tier_get_object(RGWLCCloudTierCtx& tier_ctx, bool head,
238 std::map<std::string, std::string>& headers) {
239 RGWRESTConn::get_obj_params req_params;
240 RGWBucketInfo b;
241 std::string target_obj_name;
242 int ret = 0;
243 std::unique_ptr<rgw::sal::Bucket> dest_bucket;
244 std::unique_ptr<rgw::sal::Object> dest_obj;
245 rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
246 tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
247 tier_ctx.target_storage_class);
248 std::string etag;
249 RGWRESTStreamRWRequest *in_req;
250
251 b.bucket.name = tier_ctx.target_bucket_name;
252 target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
253 tier_ctx.obj->get_name();
254 if (!tier_ctx.o.is_current()) {
255 target_obj_name += get_key_instance(tier_ctx.obj->get_key());
256 }
257
258 ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
259 if (ret < 0) {
260 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << ret << dendl;
261 return ret;
262 }
263
264 dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
265 if (!dest_obj) {
266 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
267 return -1;
268 }
269 /* init input connection */
270 req_params.get_op = !head;
271 req_params.prepend_metadata = true;
272 req_params.rgwx_stat = true;
273 req_params.sync_manifest = true;
274 req_params.skip_decrypt = true;
275
276 ret = tier_ctx.conn.get_obj(tier_ctx.dpp, dest_obj.get(), req_params, true /* send */, &in_req);
277 if (ret < 0) {
278 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: " << __func__ << "(): conn.get_obj() returned ret=" << ret << dendl;
279 return ret;
280 }
281
282 /* fetch headers */
283 ret = tier_ctx.conn.complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
284 if (ret < 0 && ret != -ENOENT) {
285 ldpp_dout(tier_ctx.dpp, 20) << "ERROR: " << __func__ << "(): conn.complete_request() returned ret=" << ret << dendl;
286 return ret;
287 }
288 return 0;
289 }
290
291 static bool is_already_tiered(const DoutPrefixProvider *dpp,
292 std::map<std::string, std::string>& headers,
293 ceph::real_time& mtime) {
294 char buf[32];
295 map<string, string> attrs = headers;
296
297 for (const auto& a : attrs) {
298 ldpp_dout(dpp, 20) << "GetCrf attr[" << a.first << "] = " << a.second <<dendl;
299 }
300 utime_t ut(mtime);
301 snprintf(buf, sizeof(buf), "%lld.%09lld",
302 (long long)ut.sec(),
303 (long long)ut.nsec());
304
305 string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
306
307 if (s.empty())
308 s = attrs["x_amz_meta_rgwx_source_mtime"];
309
310 ldpp_dout(dpp, 20) << "is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
311 ldpp_dout(dpp, 20) << "is_already_tiered mtime buf = " << buf <<dendl;
312
313 if (!s.empty() && !strcmp(s.c_str(), buf)){
314 return 1;
315 }
316 return 0;
317 }
318
319 /* Read object locally & also initialize dest rest obj based on read attrs */
320 class RGWLCStreamRead
321 {
322 CephContext *cct;
323 const DoutPrefixProvider *dpp;
324 std::map<std::string, bufferlist> attrs;
325 uint64_t obj_size;
326 rgw::sal::Object *obj;
327 const real_time &mtime;
328
329 bool multipart;
330 uint64_t m_part_size;
331 off_t m_part_off;
332 off_t m_part_end;
333
334 std::unique_ptr<rgw::sal::Object::ReadOp> read_op;
335 off_t ofs;
336 off_t end;
337 rgw_rest_obj rest_obj;
338
339 int retcode;
340
341 public:
342 RGWLCStreamRead(CephContext *_cct, const DoutPrefixProvider *_dpp,
343 RGWObjectCtx& obj_ctx, rgw::sal::Object *_obj,
344 const real_time &_mtime) :
345 cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime),
346 read_op(obj->get_read_op(&obj_ctx)) {}
347
348 ~RGWLCStreamRead() {};
349 int set_range(off_t _ofs, off_t _end);
350 int get_range(off_t &_ofs, off_t &_end);
351 rgw_rest_obj& get_rest_obj();
352 void set_multipart(uint64_t part_size, off_t part_off, off_t part_end);
353 int init();
354 int init_rest_obj();
355 int read(off_t ofs, off_t end, RGWGetDataCB *out_cb);
356 };
357
358 /* Send PUT op to remote endpoint */
359 class RGWLCCloudStreamPut
360 {
361 const DoutPrefixProvider *dpp;
362 rgw_lc_obj_properties obj_properties;
363 RGWRESTConn& conn;
364 rgw::sal::Object *dest_obj;
365 std::string etag;
366 RGWRESTStreamS3PutObj *out_req{nullptr};
367
368 struct multipart_info {
369 bool is_multipart{false};
370 std::string upload_id;
371 int part_num{0};
372 uint64_t part_size;
373 } multipart;
374
375 int retcode;
376
377 public:
378 RGWLCCloudStreamPut(const DoutPrefixProvider *_dpp,
379 const rgw_lc_obj_properties& _obj_properties,
380 RGWRESTConn& _conn,
381 rgw::sal::Object *_dest_obj) :
382 dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
383 }
384 int init();
385 static bool keep_attr(const std::string& h);
386 static void init_send_attrs(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj,
387 const rgw_lc_obj_properties& obj_properties,
388 std::map<std::string, std::string>& attrs);
389 void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj);
390 void handle_headers(const std::map<std::string, std::string>& headers);
391 bool get_etag(std::string *petag);
392 void set_multipart(const std::string& upload_id, int part_num, uint64_t part_size);
393 int send();
394 RGWGetDataCB *get_cb();
395 int complete_request();
396 };
397
398 int RGWLCStreamRead::set_range(off_t _ofs, off_t _end) {
399 ofs = _ofs;
400 end = _end;
401
402 return 0;
403 }
404
405 int RGWLCStreamRead::get_range(off_t &_ofs, off_t &_end) {
406 _ofs = ofs;
407 _end = end;
408
409 return 0;
410 }
411
412 rgw_rest_obj& RGWLCStreamRead::get_rest_obj() {
413 return rest_obj;
414 }
415
416 void RGWLCStreamRead::set_multipart(uint64_t part_size, off_t part_off, off_t part_end) {
417 multipart = true;
418 m_part_size = part_size;
419 m_part_off = part_off;
420 m_part_end = part_end;
421 }
422
423 int RGWLCStreamRead::init() {
424 optional_yield y = null_yield;
425 real_time read_mtime;
426
427 read_op->params.lastmod = &read_mtime;
428
429 int ret = read_op->prepare(y, dpp);
430 if (ret < 0) {
431 ldpp_dout(dpp, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
432 return ret;
433 }
434
435 if (read_mtime != mtime) {
436 /* raced */
437 return -ECANCELED;
438 }
439
440 attrs = obj->get_attrs();
441 obj_size = obj->get_obj_size();
442
443 ret = init_rest_obj();
444 if (ret < 0) {
445 ldpp_dout(dpp, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
446 return ret;
447 }
448
449 if (!multipart) {
450 set_range(0, obj_size - 1);
451 } else {
452 set_range(m_part_off, m_part_end);
453 }
454 return 0;
455 }
456
457 int RGWLCStreamRead::init_rest_obj() {
458 /* Initialize rgw_rest_obj.
459 * Reference: do_decode_rest_obj
460 * Check how to copy headers content */
461 rest_obj.init(obj->get_key());
462
463 if (!multipart) {
464 rest_obj.content_len = obj_size;
465 } else {
466 rest_obj.content_len = m_part_size;
467 }
468
469 /* For mulitpart attrs are sent as part of InitMultipartCR itself */
470 if (multipart) {
471 return 0;
472 }
473
474 /*
475 * XXX: verify if its right way to copy attrs into rest obj
476 */
477 init_headers(attrs, rest_obj.attrs);
478
479 rest_obj.acls.set_ctx(cct);
480 const auto aiter = attrs.find(RGW_ATTR_ACL);
481 if (aiter != attrs.end()) {
482 bufferlist& bl = aiter->second;
483 auto bliter = bl.cbegin();
484 try {
485 rest_obj.acls.decode(bliter);
486 } catch (buffer::error& err) {
487 ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
488 return -EIO;
489 }
490 } else {
491 ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
492 }
493 return 0;
494 }
495
496 int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) {
497 int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield);
498 return ret;
499 }
500
501 int RGWLCCloudStreamPut::init() {
502 /* init output connection */
503 if (multipart.is_multipart) {
504 char buf[32];
505 snprintf(buf, sizeof(buf), "%d", multipart.part_num);
506 rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
507 { "partNumber", buf },
508 { nullptr, nullptr } };
509 conn.put_obj_send_init(dest_obj, params, &out_req);
510 } else {
511 conn.put_obj_send_init(dest_obj, nullptr, &out_req);
512 }
513
514 return 0;
515 }
516
517 bool RGWLCCloudStreamPut::keep_attr(const string& h) {
518 return (keep_headers.find(h) != keep_headers.end() ||
519 boost::algorithm::starts_with(h, "X_AMZ_"));
520 }
521
522 void RGWLCCloudStreamPut::init_send_attrs(const DoutPrefixProvider *dpp,
523 const rgw_rest_obj& rest_obj,
524 const rgw_lc_obj_properties& obj_properties,
525 std::map<string, string>& attrs) {
526
527 map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
528 const std::string& target_storage_class = obj_properties.target_storage_class;
529
530 attrs.clear();
531
532 for (auto& hi : rest_obj.attrs) {
533 if (keep_attr(hi.first)) {
534 attrs.insert(hi);
535 }
536 }
537
538 const auto acl = rest_obj.acls.get_acl();
539
540 map<int, vector<string> > access_map;
541
542 if (!acl_mappings.empty()) {
543 for (auto& grant : acl.get_grant_map()) {
544 auto& orig_grantee = grant.first;
545 auto& perm = grant.second;
546
547 string grantee;
548
549 const auto& am = acl_mappings;
550
551 const auto iter = am.find(orig_grantee);
552 if (iter == am.end()) {
553 ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
554 continue;
555 }
556
557 grantee = iter->second.dest_id;
558
559 string type;
560
561 switch (iter->second.type) {
562 case ACL_TYPE_CANON_USER:
563 type = "id";
564 break;
565 case ACL_TYPE_EMAIL_USER:
566 type = "emailAddress";
567 break;
568 case ACL_TYPE_GROUP:
569 type = "uri";
570 break;
571 default:
572 continue;
573 }
574
575 string tv = type + "=" + grantee;
576
577 int flags = perm.get_permission().get_permissions();
578 if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
579 access_map[flags].push_back(tv);
580 continue;
581 }
582
583 for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
584 if (flags & i) {
585 access_map[i].push_back(tv);
586 }
587 }
588 }
589 }
590
591 for (const auto& aiter : access_map) {
592 int grant_type = aiter.first;
593
594 string header_str("x-amz-grant-");
595
596 switch (grant_type) {
597 case RGW_PERM_READ:
598 header_str.append("read");
599 break;
600 case RGW_PERM_WRITE:
601 header_str.append("write");
602 break;
603 case RGW_PERM_READ_ACP:
604 header_str.append("read-acp");
605 break;
606 case RGW_PERM_WRITE_ACP:
607 header_str.append("write-acp");
608 break;
609 case RGW_PERM_FULL_CONTROL:
610 header_str.append("full-control");
611 break;
612 }
613
614 string s;
615
616 for (const auto& viter : aiter.second) {
617 if (!s.empty()) {
618 s.append(", ");
619 }
620 s.append(viter);
621 }
622
623 ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
624
625 attrs[header_str] = s;
626 }
627
628 /* Copy target storage class */
629 if (!target_storage_class.empty()) {
630 attrs["x-amz-storage-class"] = target_storage_class;
631 } else {
632 attrs["x-amz-storage-class"] = "STANDARD";
633 }
634
635 /* New attribute to specify its transitioned from RGW */
636 attrs["x-amz-meta-rgwx-source"] = "rgw";
637
638 char buf[32];
639 snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
640 attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
641
642 utime_t ut(obj_properties.mtime);
643 snprintf(buf, sizeof(buf), "%lld.%09lld",
644 (long long)ut.sec(),
645 (long long)ut.nsec());
646
647 attrs["x-amz-meta-rgwx-source-mtime"] = buf;
648 attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
649 attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
650 if (!rest_obj.key.instance.empty()) {
651 attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
652 }
653 for (const auto& a : attrs) {
654 ldpp_dout(dpp, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <<dendl;
655 }
656 }
657
658 void RGWLCCloudStreamPut::send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) {
659 auto r = static_cast<RGWRESTStreamS3PutObj *>(out_req);
660
661 std::map<std::string, std::string> new_attrs;
662 if (!multipart.is_multipart) {
663 init_send_attrs(dpp, rest_obj, obj_properties, new_attrs);
664 }
665
666 r->set_send_length(rest_obj.content_len);
667
668 RGWAccessControlPolicy policy;
669
670 r->send_ready(dpp, conn.get_key(), new_attrs, policy);
671 }
672
673 void RGWLCCloudStreamPut::handle_headers(const map<string, string>& headers) {
674 for (const auto& h : headers) {
675 if (h.first == "ETAG") {
676 etag = h.second;
677 }
678 }
679 }
680
681 bool RGWLCCloudStreamPut::get_etag(string *petag) {
682 if (etag.empty()) {
683 return false;
684 }
685 *petag = etag;
686 return true;
687 }
688
689 void RGWLCCloudStreamPut::set_multipart(const string& upload_id, int part_num, uint64_t part_size) {
690 multipart.is_multipart = true;
691 multipart.upload_id = upload_id;
692 multipart.part_num = part_num;
693 multipart.part_size = part_size;
694 }
695
696 int RGWLCCloudStreamPut::send() {
697 int ret = RGWHTTP::send(out_req);
698 return ret;
699 }
700
701 RGWGetDataCB *RGWLCCloudStreamPut::get_cb() {
702 return out_req->get_out_cb();
703 }
704
705 int RGWLCCloudStreamPut::complete_request() {
706 int ret = conn.complete_request(out_req, etag, &obj_properties.mtime, null_yield);
707 return ret;
708 }
709
710 /* Read local copy and write to Cloud endpoint */
711 static int cloud_tier_transfer_object(const DoutPrefixProvider* dpp,
712 RGWLCStreamRead* readf, RGWLCCloudStreamPut* writef) {
713 std::string url;
714 bufferlist bl;
715 bool sent_attrs{false};
716 int ret{0};
717 off_t ofs;
718 off_t end;
719
720 ret = readf->init();
721 if (ret < 0) {
722 ldpp_dout(dpp, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl;
723 return ret;
724 }
725 readf->get_range(ofs, end);
726 rgw_rest_obj& rest_obj = readf->get_rest_obj();
727 if (!sent_attrs) {
728 ret = writef->init();
729 if (ret < 0) {
730 ldpp_dout(dpp, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl;
731 return ret;
732 }
733
734 writef->send_ready(dpp, rest_obj);
735 ret = writef->send();
736 if (ret < 0) {
737 return ret;
738 }
739 sent_attrs = true;
740 }
741
742 ret = readf->read(ofs, end, writef->get_cb());
743
744 if (ret < 0) {
745 ldpp_dout(dpp, 0) << "ERROR: fail to read from in_crf, ret = " << ret << dendl;
746 return ret;
747 }
748
749 ret = writef->complete_request();
750 if (ret < 0) {
751 ldpp_dout(dpp, 0) << "ERROR: fail to complete request, ret = " << ret << dendl;
752 return ret;
753 }
754
755 return 0;
756 }
757
758 static int cloud_tier_plain_transfer(RGWLCCloudTierCtx& tier_ctx) {
759 int ret;
760 std::unique_ptr<rgw::sal::Bucket> dest_bucket;
761 std::unique_ptr<rgw::sal::Object> dest_obj;
762
763 rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
764 tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
765 tier_ctx.target_storage_class);
766 RGWBucketInfo b;
767 std::string target_obj_name;
768
769 b.bucket.name = tier_ctx.target_bucket_name;
770 target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
771 tier_ctx.obj->get_name();
772 if (!tier_ctx.o.is_current()) {
773 target_obj_name += get_key_instance(tier_ctx.obj->get_key());
774 }
775
776 ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
777 if (ret < 0) {
778 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl;
779 return ret;
780 }
781
782 dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
783 if (!dest_obj) {
784 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
785 return -1;
786 }
787
788 tier_ctx.obj->set_atomic(&tier_ctx.rctx);
789
790 /* Prepare Read from source */
791 /* TODO: Define readf, writef as stack variables. For some reason,
792 * when used as stack variables (esp., readf), the transition seems to
793 * be taking lot of time eventually erroring out at times.
794 */
795 std::shared_ptr<RGWLCStreamRead> readf;
796 readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
797 tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
798
799 std::shared_ptr<RGWLCCloudStreamPut> writef;
800 writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
801 dest_obj.get()));
802
803 /* actual Read & Write */
804 ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
805
806 return ret;
807 }
808
809 static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx,
810 const std::string& upload_id,
811 const rgw_lc_multipart_part_info& part_info,
812 std::string *petag) {
813 int ret;
814 std::unique_ptr<rgw::sal::Bucket> dest_bucket;
815 std::unique_ptr<rgw::sal::Object> dest_obj;
816
817 rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
818 tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
819 tier_ctx.target_storage_class);
820 RGWBucketInfo b;
821 std::string target_obj_name;
822 off_t end;
823
824 b.bucket.name = tier_ctx.target_bucket_name;
825 target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
826 tier_ctx.obj->get_name();
827 if (!tier_ctx.o.is_current()) {
828 target_obj_name += get_key_instance(tier_ctx.obj->get_key());
829 }
830
831 ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
832 if (ret < 0) {
833 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl;
834 return ret;
835 }
836
837 dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
838 if (!dest_obj) {
839 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
840 return -1;
841 }
842
843 tier_ctx.obj->set_atomic(&tier_ctx.rctx);
844
845 /* TODO: Define readf, writef as stack variables. For some reason,
846 * when used as stack variables (esp., readf), the transition seems to
847 * be taking lot of time eventually erroring out at times. */
848 std::shared_ptr<RGWLCStreamRead> readf;
849 readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
850 tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
851
852 std::shared_ptr<RGWLCCloudStreamPut> writef;
853 writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
854 dest_obj.get()));
855
856 /* Prepare Read from source */
857 end = part_info.ofs + part_info.size - 1;
858 readf->set_multipart(part_info.size, part_info.ofs, end);
859
860 /* Prepare write */
861 writef->set_multipart(upload_id, part_info.part_num, part_info.size);
862
863 /* actual Read & Write */
864 ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
865 if (ret < 0) {
866 return ret;
867 }
868
869 if (!(writef->get_etag(petag))) {
870 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl;
871 return -EIO;
872 }
873
874 return 0;
875 }
876
877 static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
878 RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
879 const std::string& upload_id) {
880 int ret;
881 bufferlist out_bl;
882 bufferlist bl;
883 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
884
885 string resource = obj_to_aws_path(dest_obj);
886 ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr,
887 out_bl, &bl, nullptr, null_yield);
888
889
890 if (ret < 0) {
891 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (ret=" << ret << ")" << dendl;
892 return ret;
893 }
894
895 return 0;
896 }
897
898 static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
899 RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
900 uint64_t obj_size, std::map<std::string, std::string>& attrs,
901 std::string& upload_id) {
902 bufferlist out_bl;
903 bufferlist bl;
904
905 struct InitMultipartResult {
906 std::string bucket;
907 std::string key;
908 std::string upload_id;
909
910 void decode_xml(XMLObj *obj) {
911 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
912 RGWXMLDecoder::decode_xml("Key", key, obj);
913 RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
914 }
915 } result;
916
917 int ret;
918 rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
919
920 string resource = obj_to_aws_path(dest_obj);
921
922 ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs,
923 out_bl, &bl, nullptr, null_yield);
924
925 if (ret < 0) {
926 ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
927 return ret;
928 }
929 /*
930 * If one of the following fails we cannot abort upload, as we cannot
931 * extract the upload id. If one of these fail it's very likely that that's
932 * the least of our problem.
933 */
934 RGWXMLDecoder::XMLParser parser;
935 if (!parser.init()) {
936 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
937 return -EIO;
938 }
939
940 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
941 string str(out_bl.c_str(), out_bl.length());
942 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml initmultipart: " << str << dendl;
943 return -EIO;
944 }
945
946 try {
947 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
948 } catch (RGWXMLDecoder::err& err) {
949 string str(out_bl.c_str(), out_bl.length());
950 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
951 return -EIO;
952 }
953
954 ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
955
956 upload_id = result.upload_id;
957
958 return 0;
959 }
960
961 static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp,
962 RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
963 std::string& upload_id,
964 const std::map<int, rgw_lc_multipart_part_info>& parts) {
965 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
966
967 stringstream ss;
968 XMLFormatter formatter;
969 int ret;
970
971 bufferlist bl, out_bl;
972 string resource = obj_to_aws_path(dest_obj);
973
974 struct CompleteMultipartReq {
975 std::map<int, rgw_lc_multipart_part_info> parts;
976
977 explicit CompleteMultipartReq(const std::map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
978
979 void dump_xml(Formatter *f) const {
980 for (const auto& p : parts) {
981 f->open_object_section("Part");
982 encode_xml("PartNumber", p.first, f);
983 encode_xml("ETag", p.second.etag, f);
984 f->close_section();
985 };
986 }
987 } req_enc(parts);
988
989 struct CompleteMultipartResult {
990 std::string location;
991 std::string bucket;
992 std::string key;
993 std::string etag;
994
995 void decode_xml(XMLObj *obj) {
996 RGWXMLDecoder::decode_xml("Location", bucket, obj);
997 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
998 RGWXMLDecoder::decode_xml("Key", key, obj);
999 RGWXMLDecoder::decode_xml("ETag", etag, obj);
1000 }
1001 } result;
1002
1003 encode_xml("CompleteMultipartUpload", req_enc, &formatter);
1004
1005 formatter.flush(ss);
1006 bl.append(ss.str());
1007
1008 ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
1009 out_bl, &bl, nullptr, null_yield);
1010
1011
1012 if (ret < 0) {
1013 ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload for dest object=" << dest_obj << dendl;
1014 return ret;
1015 }
1016 /*
1017 * If one of the following fails we cannot abort upload, as we cannot
1018 * extract the upload id. If one of these fail it's very likely that that's
1019 * the least of our problem.
1020 */
1021 RGWXMLDecoder::XMLParser parser;
1022 if (!parser.init()) {
1023 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1024 return -EIO;
1025 }
1026
1027 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1028 string str(out_bl.c_str(), out_bl.length());
1029 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml Completemultipart: " << str << dendl;
1030 return -EIO;
1031 }
1032
1033 try {
1034 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
1035 } catch (RGWXMLDecoder::err& err) {
1036 string str(out_bl.c_str(), out_bl.length());
1037 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1038 return -EIO;
1039 }
1040
1041 ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
1042
1043 return ret;
1044 }
1045
1046 static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx& tier_ctx,
1047 const rgw_obj& dest_obj, const rgw_raw_obj& status_obj,
1048 const std::string& upload_id) {
1049 int ret;
1050
1051 ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id);
1052
1053 if (ret < 0) {
1054 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl;
1055 /* ignore error, best effort */
1056 }
1057 /* remove status obj */
1058 ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj);
1059 if (ret < 0) {
1060 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl;
1061 // ignore error, best effort
1062 }
1063 return 0;
1064 }
1065
1066 static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
1067 rgw_obj src_obj;
1068 rgw_obj dest_obj;
1069
1070 uint64_t obj_size;
1071 std::string src_etag;
1072 rgw_rest_obj rest_obj;
1073
1074 rgw_lc_multipart_upload_info status;
1075
1076 std::map<std::string, std::string> new_attrs;
1077
1078 rgw_raw_obj status_obj;
1079
1080 RGWBucketInfo b;
1081 std::string target_obj_name;
1082 rgw_bucket target_bucket;
1083
1084 int ret;
1085
1086 rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
1087 tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
1088 tier_ctx.target_storage_class);
1089
1090 uint32_t part_size{0};
1091 uint32_t num_parts{0};
1092
1093 int cur_part{0};
1094 uint64_t cur_ofs{0};
1095 std::map<int, rgw_lc_multipart_part_info> parts;
1096
1097 obj_size = tier_ctx.o.meta.size;
1098
1099 target_bucket.name = tier_ctx.target_bucket_name;
1100
1101 target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
1102 tier_ctx.obj->get_name();
1103 if (!tier_ctx.o.is_current()) {
1104 target_obj_name += get_key_instance(tier_ctx.obj->get_key());
1105 }
1106 dest_obj.init(target_bucket, target_obj_name);
1107
1108 status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
1109 "lc_multipart_" + tier_ctx.obj->get_oid());
1110
1111 ret = read_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status);
1112
1113 if (ret < 0 && ret != -ENOENT) {
1114 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl;
1115 return ret;
1116 }
1117
1118 if (ret >= 0) {
1119 // check here that mtime and size did not change
1120 if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
1121 status.etag != obj_properties.etag) {
1122 cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
1123 ret = -ENOENT;
1124 }
1125 }
1126
1127 if (ret == -ENOENT) {
1128 RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime);
1129
1130 readf.init();
1131
1132 rest_obj = readf.get_rest_obj();
1133
1134 RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs);
1135
1136 ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id);
1137 if (ret < 0) {
1138 return ret;
1139 }
1140
1141 status.obj_size = obj_size;
1142 status.mtime = obj_properties.mtime;
1143 status.etag = obj_properties.etag;
1144
1145 ret = put_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status);
1146
1147 if (ret < 0) {
1148 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to store multipart upload state, ret=" << ret << dendl;
1149 // continue with upload anyway
1150 }
1151
1152 #define MULTIPART_MAX_PARTS 10000
1153 #define MULTIPART_MAX_PARTS 10000
1154 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
1155 uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
1156
1157 if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
1158 min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
1159 }
1160
1161 part_size = std::max(min_conf_size, min_part_size);
1162 num_parts = (obj_size + part_size - 1) / part_size;
1163 cur_part = 1;
1164 cur_ofs = 0;
1165 }
1166
1167 for (; (uint32_t)cur_part <= num_parts; ++cur_part) {
1168 ldpp_dout(tier_ctx.dpp, 20) << "cur_part = "<< cur_part << ", info.ofs = " << cur_ofs << ", info.size = " << part_size << ", obj size = " << obj_size<< ", num_parts:" << num_parts << dendl;
1169 rgw_lc_multipart_part_info& cur_part_info = parts[cur_part];
1170 cur_part_info.part_num = cur_part;
1171 cur_part_info.ofs = cur_ofs;
1172 cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
1173
1174 cur_ofs += cur_part_info.size;
1175
1176 ret = cloud_tier_send_multipart_part(tier_ctx,
1177 status.upload_id,
1178 cur_part_info,
1179 &cur_part_info.etag);
1180
1181 if (ret < 0) {
1182 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to send multipart part of obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-ret) << ")" << dendl;
1183 cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
1184 return ret;
1185 }
1186
1187 }
1188
1189 ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts);
1190 if (ret < 0) {
1191 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl;
1192 cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
1193 return ret;
1194 }
1195
1196 /* remove status obj */
1197 ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj);
1198 if (ret < 0) {
1199 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl;
1200 // ignore error, best effort
1201 }
1202 return 0;
1203 }
1204
1205 /* Check if object has already been transitioned */
1206 static int cloud_tier_check_object(RGWLCCloudTierCtx& tier_ctx, bool& already_tiered) {
1207 int ret;
1208 std::map<std::string, std::string> headers;
1209
1210 /* Fetch Head object */
1211 ret = cloud_tier_get_object(tier_ctx, true, headers);
1212
1213 if (ret < 0) {
1214 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , ret = " << ret << dendl;
1215 return ret;
1216 }
1217
1218 already_tiered = is_already_tiered(tier_ctx.dpp, headers, tier_ctx.o.meta.mtime);
1219
1220 if (already_tiered) {
1221 ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered true" << dendl;
1222 } else {
1223 ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered false..going with out_crf writing" << dendl;
1224 }
1225
1226 return ret;
1227 }
1228
1229 static int cloud_tier_create_bucket(RGWLCCloudTierCtx& tier_ctx) {
1230 bufferlist out_bl;
1231 int ret = 0;
1232 pair<string, string> key(tier_ctx.storage_class, tier_ctx.target_bucket_name);
1233 struct CreateBucketResult {
1234 std::string code;
1235
1236 void decode_xml(XMLObj *obj) {
1237 RGWXMLDecoder::decode_xml("Code", code, obj);
1238 }
1239 } result;
1240
1241 ldpp_dout(tier_ctx.dpp, 30) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl;
1242 bufferlist bl;
1243 string resource = tier_ctx.target_bucket_name;
1244
1245 ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr,
1246 out_bl, &bl, nullptr, null_yield);
1247
1248 if (ret < 0 ) {
1249 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", ret:" << ret << dendl;
1250 return ret;
1251 }
1252 if (out_bl.length() > 0) {
1253 RGWXMLDecoder::XMLParser parser;
1254 if (!parser.init()) {
1255 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
1256 return -EIO;
1257 }
1258
1259 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1260 string str(out_bl.c_str(), out_bl.length());
1261 ldpp_dout(tier_ctx.dpp, 5) << "ERROR: failed to parse xml createbucket: " << str << dendl;
1262 return -EIO;
1263 }
1264
1265 try {
1266 RGWXMLDecoder::decode_xml("Error", result, &parser, true);
1267 } catch (RGWXMLDecoder::err& err) {
1268 string str(out_bl.c_str(), out_bl.length());
1269 ldpp_dout(tier_ctx.dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1270 return -EIO;
1271 }
1272
1273 if (result.code != "BucketAlreadyOwnedByYou") {
1274 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl;
1275 return -EIO;
1276 }
1277 }
1278
1279 return 0;
1280 }
1281
1282 int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx& tier_ctx) {
1283 int ret = 0;
1284
1285 /* If run first time attempt to create the target bucket */
1286 if (!tier_ctx.target_bucket_created) {
1287 ret = cloud_tier_create_bucket(tier_ctx);
1288
1289 if (ret < 0) {
1290 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket on the cloud endpoint ret=" << ret << dendl;
1291 return ret;
1292 }
1293 tier_ctx.target_bucket_created = true;
1294 }
1295
1296 /* Since multiple zones may try to transition the same object to the cloud,
1297 * verify if the object is already transitioned. And since its just a best
1298 * effort, do not bail out in case of any errors.
1299 */
1300 bool already_tiered = false;
1301 ret = cloud_tier_check_object(tier_ctx, already_tiered);
1302
1303 if (ret < 0) {
1304 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to check object on the cloud endpoint ret=" << ret << dendl;
1305 }
1306
1307 if (already_tiered) {
1308 ldpp_dout(tier_ctx.dpp, 20) << "Object (" << tier_ctx.o.key << ") is already tiered" << dendl;
1309 return 0;
1310 }
1311
1312 uint64_t size = tier_ctx.o.meta.size;
1313 uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
1314
1315 if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
1316 multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE;
1317 }
1318
1319 if (size < multipart_sync_threshold) {
1320 ret = cloud_tier_plain_transfer(tier_ctx);
1321 } else {
1322 tier_ctx.is_multipart_upload = true;
1323 ret = cloud_tier_multipart_transfer(tier_ctx);
1324 }
1325
1326 if (ret < 0) {
1327 ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to transition object ret=" << ret << dendl;
1328 }
1329
1330 return ret;
1331 }