1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include "common/Formatter.h"
9 #include <common/errno.h>
11 #include "rgw_lc_tier.h"
12 #include "rgw_string.h"
14 #include "rgw_common.h"
18 #include <boost/algorithm/string/split.hpp>
19 #include <boost/algorithm/string.hpp>
20 #include <boost/algorithm/string/predicate.hpp>
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
27 struct rgw_lc_multipart_part_info
{
34 struct rgw_lc_obj_properties
{
35 ceph::real_time mtime
;
37 uint64_t versioned_epoch
{0};
38 std::map
<std::string
, RGWTierACLMapping
>& target_acl_mappings
;
39 std::string target_storage_class
;
41 rgw_lc_obj_properties(ceph::real_time _mtime
, std::string _etag
,
42 uint64_t _versioned_epoch
, std::map
<std::string
,
43 RGWTierACLMapping
>& _t_acl_mappings
,
44 std::string _t_storage_class
) :
45 mtime(_mtime
), etag(_etag
),
46 versioned_epoch(_versioned_epoch
),
47 target_acl_mappings(_t_acl_mappings
),
48 target_storage_class(_t_storage_class
) {}
51 struct rgw_lc_multipart_upload_info
{
52 std::string upload_id
;
54 ceph::real_time mtime
;
57 void encode(bufferlist
& bl
) const {
58 ENCODE_START(1, 1, bl
);
59 encode(upload_id
, bl
);
66 void decode(bufferlist::const_iterator
& bl
) {
68 decode(upload_id
, bl
);
75 WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info
)
77 static inline string
get_key_instance(const rgw_obj_key
& key
)
79 if (!key
.instance
.empty() &&
80 !key
.have_null_instance()) {
81 return "-" + key
.instance
;
86 static inline string
get_key_oid(const rgw_obj_key
& key
)
88 string oid
= key
.name
;
89 if (!key
.instance
.empty() &&
90 !key
.have_null_instance()) {
91 oid
+= string("-") + key
.instance
;
96 static inline string
obj_to_aws_path(const rgw_obj
& obj
)
98 string path
= obj
.bucket
.name
+ "/" + get_key_oid(obj
.key
);
102 static int read_upload_status(const DoutPrefixProvider
*dpp
, rgw::sal::Store
*store
,
103 const rgw_raw_obj
*status_obj
, rgw_lc_multipart_upload_info
*status
)
106 rgw::sal::RadosStore
*rados
= dynamic_cast<rgw::sal::RadosStore
*>(store
);
109 ldpp_dout(dpp
, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl
;
113 auto& pool
= status_obj
->pool
;
114 const auto oid
= status_obj
->oid
;
115 auto obj_ctx
= rados
->svc()->sysobj
->init_obj_ctx();
118 ret
= rgw_get_system_obj(obj_ctx
, pool
, oid
, bl
, nullptr, nullptr,
125 if (bl
.length() > 0) {
127 auto p
= bl
.cbegin();
129 } catch (buffer::error
& e
) {
130 ldpp_dout(dpp
, 10) << "failed to decode status obj: "
131 << e
.what() << dendl
;
141 static int put_upload_status(const DoutPrefixProvider
*dpp
, rgw::sal::Store
*store
,
142 const rgw_raw_obj
*status_obj
, rgw_lc_multipart_upload_info
*status
)
145 rgw::sal::RadosStore
*rados
= dynamic_cast<rgw::sal::RadosStore
*>(store
);
148 ldpp_dout(dpp
, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl
;
152 auto& pool
= status_obj
->pool
;
153 const auto oid
= status_obj
->oid
;
154 auto obj_ctx
= rados
->svc()->sysobj
->init_obj_ctx();
158 ret
= rgw_put_system_obj(dpp
, obj_ctx
, pool
, oid
, bl
, true, nullptr,
159 real_time
{}, null_yield
);
164 static int delete_upload_status(const DoutPrefixProvider
*dpp
, rgw::sal::Store
*store
,
165 const rgw_raw_obj
*status_obj
)
168 rgw::sal::RadosStore
*rados
= dynamic_cast<rgw::sal::RadosStore
*>(store
);
171 ldpp_dout(dpp
, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl
;
175 auto& pool
= status_obj
->pool
;
176 const auto oid
= status_obj
->oid
;
177 auto sysobj
= rados
->svc()->sysobj
;
179 ret
= rgw_delete_system_obj(dpp
, sysobj
, pool
, oid
, nullptr, null_yield
);
184 static std::set
<string
> keep_headers
= { "CONTENT_TYPE",
186 "CONTENT_DISPOSITION",
187 "CONTENT_LANGUAGE" };
190 * mapping between rgw object attrs and output http fields
192 static const struct rgw_http_attr base_rgw_to_http_attrs[] = {
193 { RGW_ATTR_CONTENT_LANG, "Content-Language" },
194 { RGW_ATTR_EXPIRES, "Expires" },
195 { RGW_ATTR_CACHE_CONTROL, "Cache-Control" },
196 { RGW_ATTR_CONTENT_DISP, "Content-Disposition" },
197 { RGW_ATTR_CONTENT_ENC, "Content-Encoding" },
198 { RGW_ATTR_USER_MANIFEST, "X-Object-Manifest" },
199 { RGW_ATTR_X_ROBOTS_TAG , "X-Robots-Tag" },
200 { RGW_ATTR_STORAGE_CLASS , "X-Amz-Storage-Class" },
201 // RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION header depends on access mode:
202 // S3 endpoint: x-amz-website-redirect-location
203 // S3Website endpoint: Location
204 { RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION, "x-amz-website-redirect-location" },
207 static void init_headers(map
<string
, bufferlist
>& attrs
,
208 map
<string
, string
>& headers
)
210 for (auto& kv
: attrs
) {
211 const char * name
= kv
.first
.c_str();
212 const auto aiter
= rgw_to_http_attrs
.find(name
);
214 if (aiter
!= std::end(rgw_to_http_attrs
)) {
215 headers
[aiter
->second
] = rgw_bl_str(kv
.second
);
216 } else if (strncmp(name
, RGW_ATTR_META_PREFIX
,
217 sizeof(RGW_ATTR_META_PREFIX
)-1) == 0) {
218 name
+= sizeof(RGW_ATTR_META_PREFIX
) - 1;
220 string name_prefix
= RGW_ATTR_META_PREFIX
;
221 char full_name_buf
[name_prefix
.size() + sname
.size() + 1];
222 snprintf(full_name_buf
, sizeof(full_name_buf
), "%.*s%.*s",
223 static_cast<int>(name_prefix
.length()),
225 static_cast<int>(sname
.length()),
227 headers
[full_name_buf
] = rgw_bl_str(kv
.second
);
228 } else if (strcmp(name
,RGW_ATTR_CONTENT_TYPE
) == 0) {
229 headers
["CONTENT_TYPE"] = rgw_bl_str(kv
.second
);
234 /* Read object or just head from remote endpoint. For now initializes only headers,
235 * but can be extended to fetch etag, mtime etc if needed.
237 static int cloud_tier_get_object(RGWLCCloudTierCtx
& tier_ctx
, bool head
,
238 std::map
<std::string
, std::string
>& headers
) {
239 RGWRESTConn::get_obj_params req_params
;
241 std::string target_obj_name
;
243 std::unique_ptr
<rgw::sal::Bucket
> dest_bucket
;
244 std::unique_ptr
<rgw::sal::Object
> dest_obj
;
245 rgw_lc_obj_properties
obj_properties(tier_ctx
.o
.meta
.mtime
, tier_ctx
.o
.meta
.etag
,
246 tier_ctx
.o
.versioned_epoch
, tier_ctx
.acl_mappings
,
247 tier_ctx
.target_storage_class
);
249 RGWRESTStreamRWRequest
*in_req
;
251 b
.bucket
.name
= tier_ctx
.target_bucket_name
;
252 target_obj_name
= tier_ctx
.bucket_info
.bucket
.name
+ "/" +
253 tier_ctx
.obj
->get_name();
254 if (!tier_ctx
.o
.is_current()) {
255 target_obj_name
+= get_key_instance(tier_ctx
.obj
->get_key());
258 ret
= tier_ctx
.store
->get_bucket(nullptr, b
, &dest_bucket
);
260 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx
.target_bucket_name
<< " , reterr = " << ret
<< dendl
;
264 dest_obj
= dest_bucket
->get_object(rgw_obj_key(target_obj_name
));
266 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name
<< dendl
;
269 /* init input connection */
270 req_params
.get_op
= !head
;
271 req_params
.prepend_metadata
= true;
272 req_params
.rgwx_stat
= true;
273 req_params
.sync_manifest
= true;
274 req_params
.skip_decrypt
= true;
276 ret
= tier_ctx
.conn
.get_obj(tier_ctx
.dpp
, dest_obj
.get(), req_params
, true /* send */, &in_req
);
278 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: " << __func__
<< "(): conn.get_obj() returned ret=" << ret
<< dendl
;
283 ret
= tier_ctx
.conn
.complete_request(in_req
, nullptr, nullptr, nullptr, nullptr, &headers
, null_yield
);
284 if (ret
< 0 && ret
!= -ENOENT
) {
285 ldpp_dout(tier_ctx
.dpp
, 20) << "ERROR: " << __func__
<< "(): conn.complete_request() returned ret=" << ret
<< dendl
;
291 static bool is_already_tiered(const DoutPrefixProvider
*dpp
,
292 std::map
<std::string
, std::string
>& headers
,
293 ceph::real_time
& mtime
) {
295 map
<string
, string
> attrs
= headers
;
297 for (const auto& a
: attrs
) {
298 ldpp_dout(dpp
, 20) << "GetCrf attr[" << a
.first
<< "] = " << a
.second
<<dendl
;
301 snprintf(buf
, sizeof(buf
), "%lld.%09lld",
303 (long long)ut
.nsec());
305 string s
= attrs
["X_AMZ_META_RGWX_SOURCE_MTIME"];
308 s
= attrs
["x_amz_meta_rgwx_source_mtime"];
310 ldpp_dout(dpp
, 20) << "is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s
<<dendl
;
311 ldpp_dout(dpp
, 20) << "is_already_tiered mtime buf = " << buf
<<dendl
;
313 if (!s
.empty() && !strcmp(s
.c_str(), buf
)){
319 /* Read object locally & also initialize dest rest obj based on read attrs */
320 class RGWLCStreamRead
323 const DoutPrefixProvider
*dpp
;
324 std::map
<std::string
, bufferlist
> attrs
;
326 rgw::sal::Object
*obj
;
327 const real_time
&mtime
;
330 uint64_t m_part_size
;
334 std::unique_ptr
<rgw::sal::Object::ReadOp
> read_op
;
337 rgw_rest_obj rest_obj
;
342 RGWLCStreamRead(CephContext
*_cct
, const DoutPrefixProvider
*_dpp
,
343 RGWObjectCtx
& obj_ctx
, rgw::sal::Object
*_obj
,
344 const real_time
&_mtime
) :
345 cct(_cct
), dpp(_dpp
), obj(_obj
), mtime(_mtime
),
346 read_op(obj
->get_read_op(&obj_ctx
)) {}
348 ~RGWLCStreamRead() {};
349 int set_range(off_t _ofs
, off_t _end
);
350 int get_range(off_t
&_ofs
, off_t
&_end
);
351 rgw_rest_obj
& get_rest_obj();
352 void set_multipart(uint64_t part_size
, off_t part_off
, off_t part_end
);
355 int read(off_t ofs
, off_t end
, RGWGetDataCB
*out_cb
);
358 /* Send PUT op to remote endpoint */
359 class RGWLCCloudStreamPut
361 const DoutPrefixProvider
*dpp
;
362 rgw_lc_obj_properties obj_properties
;
364 rgw::sal::Object
*dest_obj
;
366 RGWRESTStreamS3PutObj
*out_req
{nullptr};
368 struct multipart_info
{
369 bool is_multipart
{false};
370 std::string upload_id
;
378 RGWLCCloudStreamPut(const DoutPrefixProvider
*_dpp
,
379 const rgw_lc_obj_properties
& _obj_properties
,
381 rgw::sal::Object
*_dest_obj
) :
382 dpp(_dpp
), obj_properties(_obj_properties
), conn(_conn
), dest_obj(_dest_obj
) {
385 static bool keep_attr(const std::string
& h
);
386 static void init_send_attrs(const DoutPrefixProvider
*dpp
, const rgw_rest_obj
& rest_obj
,
387 const rgw_lc_obj_properties
& obj_properties
,
388 std::map
<std::string
, std::string
>& attrs
);
389 void send_ready(const DoutPrefixProvider
*dpp
, const rgw_rest_obj
& rest_obj
);
390 void handle_headers(const std::map
<std::string
, std::string
>& headers
);
391 bool get_etag(std::string
*petag
);
392 void set_multipart(const std::string
& upload_id
, int part_num
, uint64_t part_size
);
394 RGWGetDataCB
*get_cb();
395 int complete_request();
398 int RGWLCStreamRead::set_range(off_t _ofs
, off_t _end
) {
405 int RGWLCStreamRead::get_range(off_t
&_ofs
, off_t
&_end
) {
412 rgw_rest_obj
& RGWLCStreamRead::get_rest_obj() {
416 void RGWLCStreamRead::set_multipart(uint64_t part_size
, off_t part_off
, off_t part_end
) {
418 m_part_size
= part_size
;
419 m_part_off
= part_off
;
420 m_part_end
= part_end
;
423 int RGWLCStreamRead::init() {
424 optional_yield y
= null_yield
;
425 real_time read_mtime
;
427 read_op
->params
.lastmod
= &read_mtime
;
429 int ret
= read_op
->prepare(y
, dpp
);
431 ldpp_dout(dpp
, 0) << "ERROR: fail to prepare read_op, ret = " << ret
<< dendl
;
435 if (read_mtime
!= mtime
) {
440 attrs
= obj
->get_attrs();
441 obj_size
= obj
->get_obj_size();
443 ret
= init_rest_obj();
445 ldpp_dout(dpp
, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret
<< dendl
;
450 set_range(0, obj_size
- 1);
452 set_range(m_part_off
, m_part_end
);
457 int RGWLCStreamRead::init_rest_obj() {
458 /* Initialize rgw_rest_obj.
459 * Reference: do_decode_rest_obj
460 * Check how to copy headers content */
461 rest_obj
.init(obj
->get_key());
464 rest_obj
.content_len
= obj_size
;
466 rest_obj
.content_len
= m_part_size
;
469 /* For mulitpart attrs are sent as part of InitMultipartCR itself */
475 * XXX: verify if its right way to copy attrs into rest obj
477 init_headers(attrs
, rest_obj
.attrs
);
479 rest_obj
.acls
.set_ctx(cct
);
480 const auto aiter
= attrs
.find(RGW_ATTR_ACL
);
481 if (aiter
!= attrs
.end()) {
482 bufferlist
& bl
= aiter
->second
;
483 auto bliter
= bl
.cbegin();
485 rest_obj
.acls
.decode(bliter
);
486 } catch (buffer::error
& err
) {
487 ldpp_dout(dpp
, 0) << "ERROR: failed to decode policy off attrs" << dendl
;
491 ldpp_dout(dpp
, 0) << "WARNING: acl attrs not provided" << dendl
;
496 int RGWLCStreamRead::read(off_t ofs
, off_t end
, RGWGetDataCB
*out_cb
) {
497 int ret
= read_op
->iterate(dpp
, ofs
, end
, out_cb
, null_yield
);
501 int RGWLCCloudStreamPut::init() {
502 /* init output connection */
503 if (multipart
.is_multipart
) {
505 snprintf(buf
, sizeof(buf
), "%d", multipart
.part_num
);
506 rgw_http_param_pair params
[] = { { "uploadId", multipart
.upload_id
.c_str() },
507 { "partNumber", buf
},
508 { nullptr, nullptr } };
509 conn
.put_obj_send_init(dest_obj
, params
, &out_req
);
511 conn
.put_obj_send_init(dest_obj
, nullptr, &out_req
);
517 bool RGWLCCloudStreamPut::keep_attr(const string
& h
) {
518 return (keep_headers
.find(h
) != keep_headers
.end() ||
519 boost::algorithm::starts_with(h
, "X_AMZ_"));
522 void RGWLCCloudStreamPut::init_send_attrs(const DoutPrefixProvider
*dpp
,
523 const rgw_rest_obj
& rest_obj
,
524 const rgw_lc_obj_properties
& obj_properties
,
525 std::map
<string
, string
>& attrs
) {
527 map
<string
, RGWTierACLMapping
>& acl_mappings(obj_properties
.target_acl_mappings
);
528 const std::string
& target_storage_class
= obj_properties
.target_storage_class
;
532 for (auto& hi
: rest_obj
.attrs
) {
533 if (keep_attr(hi
.first
)) {
538 const auto acl
= rest_obj
.acls
.get_acl();
540 map
<int, vector
<string
> > access_map
;
542 if (!acl_mappings
.empty()) {
543 for (auto& grant
: acl
.get_grant_map()) {
544 auto& orig_grantee
= grant
.first
;
545 auto& perm
= grant
.second
;
549 const auto& am
= acl_mappings
;
551 const auto iter
= am
.find(orig_grantee
);
552 if (iter
== am
.end()) {
553 ldpp_dout(dpp
, 20) << "acl_mappings: Could not find " << orig_grantee
<< " .. ignoring" << dendl
;
557 grantee
= iter
->second
.dest_id
;
561 switch (iter
->second
.type
) {
562 case ACL_TYPE_CANON_USER
:
565 case ACL_TYPE_EMAIL_USER
:
566 type
= "emailAddress";
575 string tv
= type
+ "=" + grantee
;
577 int flags
= perm
.get_permission().get_permissions();
578 if ((flags
& RGW_PERM_FULL_CONTROL
) == RGW_PERM_FULL_CONTROL
) {
579 access_map
[flags
].push_back(tv
);
583 for (int i
= 1; i
<= RGW_PERM_WRITE_ACP
; i
<<= 1) {
585 access_map
[i
].push_back(tv
);
591 for (const auto& aiter
: access_map
) {
592 int grant_type
= aiter
.first
;
594 string
header_str("x-amz-grant-");
596 switch (grant_type
) {
598 header_str
.append("read");
601 header_str
.append("write");
603 case RGW_PERM_READ_ACP
:
604 header_str
.append("read-acp");
606 case RGW_PERM_WRITE_ACP
:
607 header_str
.append("write-acp");
609 case RGW_PERM_FULL_CONTROL
:
610 header_str
.append("full-control");
616 for (const auto& viter
: aiter
.second
) {
623 ldpp_dout(dpp
, 20) << "acl_mappings: set acl: " << header_str
<< "=" << s
<< dendl
;
625 attrs
[header_str
] = s
;
628 /* Copy target storage class */
629 if (!target_storage_class
.empty()) {
630 attrs
["x-amz-storage-class"] = target_storage_class
;
632 attrs
["x-amz-storage-class"] = "STANDARD";
635 /* New attribute to specify its transitioned from RGW */
636 attrs
["x-amz-meta-rgwx-source"] = "rgw";
639 snprintf(buf
, sizeof(buf
), "%llu", (long long)obj_properties
.versioned_epoch
);
640 attrs
["x-amz-meta-rgwx-versioned-epoch"] = buf
;
642 utime_t
ut(obj_properties
.mtime
);
643 snprintf(buf
, sizeof(buf
), "%lld.%09lld",
645 (long long)ut
.nsec());
647 attrs
["x-amz-meta-rgwx-source-mtime"] = buf
;
648 attrs
["x-amz-meta-rgwx-source-etag"] = obj_properties
.etag
;
649 attrs
["x-amz-meta-rgwx-source-key"] = rest_obj
.key
.name
;
650 if (!rest_obj
.key
.instance
.empty()) {
651 attrs
["x-amz-meta-rgwx-source-version-id"] = rest_obj
.key
.instance
;
653 for (const auto& a
: attrs
) {
654 ldpp_dout(dpp
, 30) << "init_send_attrs attr[" << a
.first
<< "] = " << a
.second
<<dendl
;
658 void RGWLCCloudStreamPut::send_ready(const DoutPrefixProvider
*dpp
, const rgw_rest_obj
& rest_obj
) {
659 auto r
= static_cast<RGWRESTStreamS3PutObj
*>(out_req
);
661 std::map
<std::string
, std::string
> new_attrs
;
662 if (!multipart
.is_multipart
) {
663 init_send_attrs(dpp
, rest_obj
, obj_properties
, new_attrs
);
666 r
->set_send_length(rest_obj
.content_len
);
668 RGWAccessControlPolicy policy
;
670 r
->send_ready(dpp
, conn
.get_key(), new_attrs
, policy
);
673 void RGWLCCloudStreamPut::handle_headers(const map
<string
, string
>& headers
) {
674 for (const auto& h
: headers
) {
675 if (h
.first
== "ETAG") {
681 bool RGWLCCloudStreamPut::get_etag(string
*petag
) {
689 void RGWLCCloudStreamPut::set_multipart(const string
& upload_id
, int part_num
, uint64_t part_size
) {
690 multipart
.is_multipart
= true;
691 multipart
.upload_id
= upload_id
;
692 multipart
.part_num
= part_num
;
693 multipart
.part_size
= part_size
;
696 int RGWLCCloudStreamPut::send() {
697 int ret
= RGWHTTP::send(out_req
);
701 RGWGetDataCB
*RGWLCCloudStreamPut::get_cb() {
702 return out_req
->get_out_cb();
705 int RGWLCCloudStreamPut::complete_request() {
706 int ret
= conn
.complete_request(out_req
, etag
, &obj_properties
.mtime
, null_yield
);
710 /* Read local copy and write to Cloud endpoint */
711 static int cloud_tier_transfer_object(const DoutPrefixProvider
* dpp
,
712 RGWLCStreamRead
* readf
, RGWLCCloudStreamPut
* writef
) {
715 bool sent_attrs
{false};
722 ldpp_dout(dpp
, 0) << "ERROR: fail to initialize in_crf, ret = " << ret
<< dendl
;
725 readf
->get_range(ofs
, end
);
726 rgw_rest_obj
& rest_obj
= readf
->get_rest_obj();
728 ret
= writef
->init();
730 ldpp_dout(dpp
, 0) << "ERROR: fail to initialize out_crf, ret = " << ret
<< dendl
;
734 writef
->send_ready(dpp
, rest_obj
);
735 ret
= writef
->send();
742 ret
= readf
->read(ofs
, end
, writef
->get_cb());
745 ldpp_dout(dpp
, 0) << "ERROR: fail to read from in_crf, ret = " << ret
<< dendl
;
749 ret
= writef
->complete_request();
751 ldpp_dout(dpp
, 0) << "ERROR: fail to complete request, ret = " << ret
<< dendl
;
758 static int cloud_tier_plain_transfer(RGWLCCloudTierCtx
& tier_ctx
) {
760 std::unique_ptr
<rgw::sal::Bucket
> dest_bucket
;
761 std::unique_ptr
<rgw::sal::Object
> dest_obj
;
763 rgw_lc_obj_properties
obj_properties(tier_ctx
.o
.meta
.mtime
, tier_ctx
.o
.meta
.etag
,
764 tier_ctx
.o
.versioned_epoch
, tier_ctx
.acl_mappings
,
765 tier_ctx
.target_storage_class
);
767 std::string target_obj_name
;
769 b
.bucket
.name
= tier_ctx
.target_bucket_name
;
770 target_obj_name
= tier_ctx
.bucket_info
.bucket
.name
+ "/" +
771 tier_ctx
.obj
->get_name();
772 if (!tier_ctx
.o
.is_current()) {
773 target_obj_name
+= get_key_instance(tier_ctx
.obj
->get_key());
776 ret
= tier_ctx
.store
->get_bucket(nullptr, b
, &dest_bucket
);
778 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx
.target_bucket_name
<< " , ret = " << ret
<< dendl
;
782 dest_obj
= dest_bucket
->get_object(rgw_obj_key(target_obj_name
));
784 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name
<< dendl
;
788 tier_ctx
.obj
->set_atomic(&tier_ctx
.rctx
);
790 /* Prepare Read from source */
791 /* TODO: Define readf, writef as stack variables. For some reason,
792 * when used as stack variables (esp., readf), the transition seems to
793 * be taking lot of time eventually erroring out at times.
795 std::shared_ptr
<RGWLCStreamRead
> readf
;
796 readf
.reset(new RGWLCStreamRead(tier_ctx
.cct
, tier_ctx
.dpp
,
797 tier_ctx
.rctx
, tier_ctx
.obj
, tier_ctx
.o
.meta
.mtime
));
799 std::shared_ptr
<RGWLCCloudStreamPut
> writef
;
800 writef
.reset(new RGWLCCloudStreamPut(tier_ctx
.dpp
, obj_properties
, tier_ctx
.conn
,
803 /* actual Read & Write */
804 ret
= cloud_tier_transfer_object(tier_ctx
.dpp
, readf
.get(), writef
.get());
809 static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx
& tier_ctx
,
810 const std::string
& upload_id
,
811 const rgw_lc_multipart_part_info
& part_info
,
812 std::string
*petag
) {
814 std::unique_ptr
<rgw::sal::Bucket
> dest_bucket
;
815 std::unique_ptr
<rgw::sal::Object
> dest_obj
;
817 rgw_lc_obj_properties
obj_properties(tier_ctx
.o
.meta
.mtime
, tier_ctx
.o
.meta
.etag
,
818 tier_ctx
.o
.versioned_epoch
, tier_ctx
.acl_mappings
,
819 tier_ctx
.target_storage_class
);
821 std::string target_obj_name
;
824 b
.bucket
.name
= tier_ctx
.target_bucket_name
;
825 target_obj_name
= tier_ctx
.bucket_info
.bucket
.name
+ "/" +
826 tier_ctx
.obj
->get_name();
827 if (!tier_ctx
.o
.is_current()) {
828 target_obj_name
+= get_key_instance(tier_ctx
.obj
->get_key());
831 ret
= tier_ctx
.store
->get_bucket(nullptr, b
, &dest_bucket
);
833 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx
.target_bucket_name
<< " , ret = " << ret
<< dendl
;
837 dest_obj
= dest_bucket
->get_object(rgw_obj_key(target_obj_name
));
839 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name
<< dendl
;
843 tier_ctx
.obj
->set_atomic(&tier_ctx
.rctx
);
845 /* TODO: Define readf, writef as stack variables. For some reason,
846 * when used as stack variables (esp., readf), the transition seems to
847 * be taking lot of time eventually erroring out at times. */
848 std::shared_ptr
<RGWLCStreamRead
> readf
;
849 readf
.reset(new RGWLCStreamRead(tier_ctx
.cct
, tier_ctx
.dpp
,
850 tier_ctx
.rctx
, tier_ctx
.obj
, tier_ctx
.o
.meta
.mtime
));
852 std::shared_ptr
<RGWLCCloudStreamPut
> writef
;
853 writef
.reset(new RGWLCCloudStreamPut(tier_ctx
.dpp
, obj_properties
, tier_ctx
.conn
,
856 /* Prepare Read from source */
857 end
= part_info
.ofs
+ part_info
.size
- 1;
858 readf
->set_multipart(part_info
.size
, part_info
.ofs
, end
);
861 writef
->set_multipart(upload_id
, part_info
.part_num
, part_info
.size
);
863 /* actual Read & Write */
864 ret
= cloud_tier_transfer_object(tier_ctx
.dpp
, readf
.get(), writef
.get());
869 if (!(writef
->get_etag(petag
))) {
870 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to get etag from PUT request" << dendl
;
877 static int cloud_tier_abort_multipart(const DoutPrefixProvider
*dpp
,
878 RGWRESTConn
& dest_conn
, const rgw_obj
& dest_obj
,
879 const std::string
& upload_id
) {
883 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
885 string resource
= obj_to_aws_path(dest_obj
);
886 ret
= dest_conn
.send_resource(dpp
, "DELETE", resource
, params
, nullptr,
887 out_bl
, &bl
, nullptr, null_yield
);
891 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj
<< " (ret=" << ret
<< ")" << dendl
;
898 static int cloud_tier_init_multipart(const DoutPrefixProvider
*dpp
,
899 RGWRESTConn
& dest_conn
, const rgw_obj
& dest_obj
,
900 uint64_t obj_size
, std::map
<std::string
, std::string
>& attrs
,
901 std::string
& upload_id
) {
905 struct InitMultipartResult
{
908 std::string upload_id
;
910 void decode_xml(XMLObj
*obj
) {
911 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
912 RGWXMLDecoder::decode_xml("Key", key
, obj
);
913 RGWXMLDecoder::decode_xml("UploadId", upload_id
, obj
);
918 rgw_http_param_pair params
[] = { { "uploads", nullptr }, {nullptr, nullptr} };
920 string resource
= obj_to_aws_path(dest_obj
);
922 ret
= dest_conn
.send_resource(dpp
, "POST", resource
, params
, &attrs
,
923 out_bl
, &bl
, nullptr, null_yield
);
926 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
930 * If one of the following fails we cannot abort upload, as we cannot
931 * extract the upload id. If one of these fail it's very likely that that's
932 * the least of our problem.
934 RGWXMLDecoder::XMLParser parser
;
935 if (!parser
.init()) {
936 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
940 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
941 string
str(out_bl
.c_str(), out_bl
.length());
942 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml initmultipart: " << str
<< dendl
;
947 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result
, &parser
, true);
948 } catch (RGWXMLDecoder::err
& err
) {
949 string
str(out_bl
.c_str(), out_bl
.length());
950 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
954 ldpp_dout(dpp
, 20) << "init multipart result: bucket=" << result
.bucket
<< " key=" << result
.key
<< " upload_id=" << result
.upload_id
<< dendl
;
956 upload_id
= result
.upload_id
;
961 static int cloud_tier_complete_multipart(const DoutPrefixProvider
*dpp
,
962 RGWRESTConn
& dest_conn
, const rgw_obj
& dest_obj
,
963 std::string
& upload_id
,
964 const std::map
<int, rgw_lc_multipart_part_info
>& parts
) {
965 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
968 XMLFormatter formatter
;
971 bufferlist bl
, out_bl
;
972 string resource
= obj_to_aws_path(dest_obj
);
974 struct CompleteMultipartReq
{
975 std::map
<int, rgw_lc_multipart_part_info
> parts
;
977 explicit CompleteMultipartReq(const std::map
<int, rgw_lc_multipart_part_info
>& _parts
) : parts(_parts
) {}
979 void dump_xml(Formatter
*f
) const {
980 for (const auto& p
: parts
) {
981 f
->open_object_section("Part");
982 encode_xml("PartNumber", p
.first
, f
);
983 encode_xml("ETag", p
.second
.etag
, f
);
989 struct CompleteMultipartResult
{
990 std::string location
;
995 void decode_xml(XMLObj
*obj
) {
996 RGWXMLDecoder::decode_xml("Location", bucket
, obj
);
997 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
998 RGWXMLDecoder::decode_xml("Key", key
, obj
);
999 RGWXMLDecoder::decode_xml("ETag", etag
, obj
);
1003 encode_xml("CompleteMultipartUpload", req_enc
, &formatter
);
1005 formatter
.flush(ss
);
1006 bl
.append(ss
.str());
1008 ret
= dest_conn
.send_resource(dpp
, "POST", resource
, params
, nullptr,
1009 out_bl
, &bl
, nullptr, null_yield
);
1013 ldpp_dout(dpp
, 0) << "ERROR: failed to complete multipart upload for dest object=" << dest_obj
<< dendl
;
1017 * If one of the following fails we cannot abort upload, as we cannot
1018 * extract the upload id. If one of these fail it's very likely that that's
1019 * the least of our problem.
1021 RGWXMLDecoder::XMLParser parser
;
1022 if (!parser
.init()) {
1023 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1027 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1028 string
str(out_bl
.c_str(), out_bl
.length());
1029 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml Completemultipart: " << str
<< dendl
;
1034 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result
, &parser
, true);
1035 } catch (RGWXMLDecoder::err
& err
) {
1036 string
str(out_bl
.c_str(), out_bl
.length());
1037 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1041 ldpp_dout(dpp
, 20) << "complete multipart result: location=" << result
.location
<< " bucket=" << result
.bucket
<< " key=" << result
.key
<< " etag=" << result
.etag
<< dendl
;
1046 static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx
& tier_ctx
,
1047 const rgw_obj
& dest_obj
, const rgw_raw_obj
& status_obj
,
1048 const std::string
& upload_id
) {
1051 ret
= cloud_tier_abort_multipart(tier_ctx
.dpp
, tier_ctx
.conn
, dest_obj
, upload_id
);
1054 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj
<< " upload_id=" << upload_id
<< " ret=" << ret
<< dendl
;
1055 /* ignore error, best effort */
1057 /* remove status obj */
1058 ret
= delete_upload_status(tier_ctx
.dpp
, tier_ctx
.store
, &status_obj
);
1060 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj
<< " ret=" << ret
<< dendl
;
1061 // ignore error, best effort
1066 static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx
& tier_ctx
) {
1071 std::string src_etag
;
1072 rgw_rest_obj rest_obj
;
1074 rgw_lc_multipart_upload_info status
;
1076 std::map
<std::string
, std::string
> new_attrs
;
1078 rgw_raw_obj status_obj
;
1081 std::string target_obj_name
;
1082 rgw_bucket target_bucket
;
1086 rgw_lc_obj_properties
obj_properties(tier_ctx
.o
.meta
.mtime
, tier_ctx
.o
.meta
.etag
,
1087 tier_ctx
.o
.versioned_epoch
, tier_ctx
.acl_mappings
,
1088 tier_ctx
.target_storage_class
);
1090 uint32_t part_size
{0};
1091 uint32_t num_parts
{0};
1094 uint64_t cur_ofs
{0};
1095 std::map
<int, rgw_lc_multipart_part_info
> parts
;
1097 obj_size
= tier_ctx
.o
.meta
.size
;
1099 target_bucket
.name
= tier_ctx
.target_bucket_name
;
1101 target_obj_name
= tier_ctx
.bucket_info
.bucket
.name
+ "/" +
1102 tier_ctx
.obj
->get_name();
1103 if (!tier_ctx
.o
.is_current()) {
1104 target_obj_name
+= get_key_instance(tier_ctx
.obj
->get_key());
1106 dest_obj
.init(target_bucket
, target_obj_name
);
1108 status_obj
= rgw_raw_obj(tier_ctx
.store
->get_zone()->get_params().log_pool
,
1109 "lc_multipart_" + tier_ctx
.obj
->get_oid());
1111 ret
= read_upload_status(tier_ctx
.dpp
, tier_ctx
.store
, &status_obj
, &status
);
1113 if (ret
< 0 && ret
!= -ENOENT
) {
1114 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to read sync status of object " << src_obj
<< " ret=" << ret
<< dendl
;
1119 // check here that mtime and size did not change
1120 if (status
.mtime
!= obj_properties
.mtime
|| status
.obj_size
!= obj_size
||
1121 status
.etag
!= obj_properties
.etag
) {
1122 cloud_tier_abort_multipart_upload(tier_ctx
, dest_obj
, status_obj
, status
.upload_id
);
1127 if (ret
== -ENOENT
) {
1128 RGWLCStreamRead
readf(tier_ctx
.cct
, tier_ctx
.dpp
, tier_ctx
.rctx
, tier_ctx
.obj
, tier_ctx
.o
.meta
.mtime
);
1132 rest_obj
= readf
.get_rest_obj();
1134 RGWLCCloudStreamPut::init_send_attrs(tier_ctx
.dpp
, rest_obj
, obj_properties
, new_attrs
);
1136 ret
= cloud_tier_init_multipart(tier_ctx
.dpp
, tier_ctx
.conn
, dest_obj
, obj_size
, new_attrs
, status
.upload_id
);
1141 status
.obj_size
= obj_size
;
1142 status
.mtime
= obj_properties
.mtime
;
1143 status
.etag
= obj_properties
.etag
;
1145 ret
= put_upload_status(tier_ctx
.dpp
, tier_ctx
.store
, &status_obj
, &status
);
1148 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to store multipart upload state, ret=" << ret
<< dendl
;
1149 // continue with upload anyway
1152 #define MULTIPART_MAX_PARTS 10000
1153 #define MULTIPART_MAX_PARTS 10000
1154 uint64_t min_part_size
= obj_size
/ MULTIPART_MAX_PARTS
;
1155 uint64_t min_conf_size
= tier_ctx
.multipart_min_part_size
;
1157 if (min_conf_size
< MULTIPART_MIN_POSSIBLE_PART_SIZE
) {
1158 min_conf_size
= MULTIPART_MIN_POSSIBLE_PART_SIZE
;
1161 part_size
= std::max(min_conf_size
, min_part_size
);
1162 num_parts
= (obj_size
+ part_size
- 1) / part_size
;
1167 for (; (uint32_t)cur_part
<= num_parts
; ++cur_part
) {
1168 ldpp_dout(tier_ctx
.dpp
, 20) << "cur_part = "<< cur_part
<< ", info.ofs = " << cur_ofs
<< ", info.size = " << part_size
<< ", obj size = " << obj_size
<< ", num_parts:" << num_parts
<< dendl
;
1169 rgw_lc_multipart_part_info
& cur_part_info
= parts
[cur_part
];
1170 cur_part_info
.part_num
= cur_part
;
1171 cur_part_info
.ofs
= cur_ofs
;
1172 cur_part_info
.size
= std::min((uint64_t)part_size
, obj_size
- cur_ofs
);
1174 cur_ofs
+= cur_part_info
.size
;
1176 ret
= cloud_tier_send_multipart_part(tier_ctx
,
1179 &cur_part_info
.etag
);
1182 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to send multipart part of obj=" << tier_ctx
.obj
<< ", sync via multipart upload, upload_id=" << status
.upload_id
<< " part number " << cur_part
<< " (error: " << cpp_strerror(-ret
) << ")" << dendl
;
1183 cloud_tier_abort_multipart_upload(tier_ctx
, dest_obj
, status_obj
, status
.upload_id
);
1189 ret
= cloud_tier_complete_multipart(tier_ctx
.dpp
, tier_ctx
.conn
, dest_obj
, status
.upload_id
, parts
);
1191 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx
.obj
<< " (error: " << cpp_strerror(-ret
) << ")" << dendl
;
1192 cloud_tier_abort_multipart_upload(tier_ctx
, dest_obj
, status_obj
, status
.upload_id
);
1196 /* remove status obj */
1197 ret
= delete_upload_status(tier_ctx
.dpp
, tier_ctx
.store
, &status_obj
);
1199 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx
.obj
<< " upload_id=" << status
.upload_id
<< " part number " << cur_part
<< " (" << cpp_strerror(-ret
) << ")" << dendl
;
1200 // ignore error, best effort
1205 /* Check if object has already been transitioned */
1206 static int cloud_tier_check_object(RGWLCCloudTierCtx
& tier_ctx
, bool& already_tiered
) {
1208 std::map
<std::string
, std::string
> headers
;
1210 /* Fetch Head object */
1211 ret
= cloud_tier_get_object(tier_ctx
, true, headers
);
1214 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx
.obj
<< " , ret = " << ret
<< dendl
;
1218 already_tiered
= is_already_tiered(tier_ctx
.dpp
, headers
, tier_ctx
.o
.meta
.mtime
);
1220 if (already_tiered
) {
1221 ldpp_dout(tier_ctx
.dpp
, 20) << "is_already_tiered true" << dendl
;
1223 ldpp_dout(tier_ctx
.dpp
, 20) << "is_already_tiered false..going with out_crf writing" << dendl
;
1229 static int cloud_tier_create_bucket(RGWLCCloudTierCtx
& tier_ctx
) {
1232 pair
<string
, string
> key(tier_ctx
.storage_class
, tier_ctx
.target_bucket_name
);
1233 struct CreateBucketResult
{
1236 void decode_xml(XMLObj
*obj
) {
1237 RGWXMLDecoder::decode_xml("Code", code
, obj
);
1241 ldpp_dout(tier_ctx
.dpp
, 30) << "Cloud_tier_ctx: creating bucket:" << tier_ctx
.target_bucket_name
<< dendl
;
1243 string resource
= tier_ctx
.target_bucket_name
;
1245 ret
= tier_ctx
.conn
.send_resource(tier_ctx
.dpp
, "PUT", resource
, nullptr, nullptr,
1246 out_bl
, &bl
, nullptr, null_yield
);
1249 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to create target bucket: " << tier_ctx
.target_bucket_name
<< ", ret:" << ret
<< dendl
;
1252 if (out_bl
.length() > 0) {
1253 RGWXMLDecoder::XMLParser parser
;
1254 if (!parser
.init()) {
1255 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl
;
1259 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1260 string
str(out_bl
.c_str(), out_bl
.length());
1261 ldpp_dout(tier_ctx
.dpp
, 5) << "ERROR: failed to parse xml createbucket: " << str
<< dendl
;
1266 RGWXMLDecoder::decode_xml("Error", result
, &parser
, true);
1267 } catch (RGWXMLDecoder::err
& err
) {
1268 string
str(out_bl
.c_str(), out_bl
.length());
1269 ldpp_dout(tier_ctx
.dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1273 if (result
.code
!= "BucketAlreadyOwnedByYou") {
1274 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: Creating target bucket failed with error: " << result
.code
<< dendl
;
1282 int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx
& tier_ctx
) {
1285 /* If run first time attempt to create the target bucket */
1286 if (!tier_ctx
.target_bucket_created
) {
1287 ret
= cloud_tier_create_bucket(tier_ctx
);
1290 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to create target bucket on the cloud endpoint ret=" << ret
<< dendl
;
1293 tier_ctx
.target_bucket_created
= true;
1296 /* Since multiple zones may try to transition the same object to the cloud,
1297 * verify if the object is already transitioned. And since its just a best
1298 * effort, do not bail out in case of any errors.
1300 bool already_tiered
= false;
1301 ret
= cloud_tier_check_object(tier_ctx
, already_tiered
);
1304 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to check object on the cloud endpoint ret=" << ret
<< dendl
;
1307 if (already_tiered
) {
1308 ldpp_dout(tier_ctx
.dpp
, 20) << "Object (" << tier_ctx
.o
.key
<< ") is already tiered" << dendl
;
1312 uint64_t size
= tier_ctx
.o
.meta
.size
;
1313 uint64_t multipart_sync_threshold
= tier_ctx
.multipart_sync_threshold
;
1315 if (multipart_sync_threshold
< MULTIPART_MIN_POSSIBLE_PART_SIZE
) {
1316 multipart_sync_threshold
= MULTIPART_MIN_POSSIBLE_PART_SIZE
;
1319 if (size
< multipart_sync_threshold
) {
1320 ret
= cloud_tier_plain_transfer(tier_ctx
);
1322 tier_ctx
.is_multipart_upload
= true;
1323 ret
= cloud_tier_multipart_transfer(tier_ctx
);
1327 ldpp_dout(tier_ctx
.dpp
, 0) << "ERROR: failed to transition object ret=" << ret
<< dendl
;