1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/errno.h"
6 #include "rgw_common.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_sync_module.h"
9 #include "rgw_data_sync.h"
10 #include "rgw_sync_module_aws.h"
11 #include "rgw_cr_rados.h"
12 #include "rgw_rest_conn.h"
13 #include "rgw_cr_rest.h"
17 #include "services/svc_zone.h"
19 #include <boost/asio/yield.hpp>
21 #define dout_subsys ceph_subsys_rgw
24 #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
28 static string default_target_path
= "rgw-${zonegroup}-${sid}/${bucket}";
30 static string
get_key_oid(const rgw_obj_key
& key
)
32 string oid
= key
.name
;
33 if (!key
.instance
.empty() &&
34 !key
.have_null_instance()) {
35 oid
+= string(":") + key
.instance
;
40 static string
obj_to_aws_path(rgw::sal::Object
* obj
)
42 string path
= obj
->get_bucket()->get_name() + "/" + get_key_oid(obj
->get_key());
50 json configuration definition:
54 "access_key": <access>,
56 "endpoint": <endpoint>,
57 "host_style": <path | virtual>,
59 "acls": [ { "type": <id | email | uri>,
60 "source_id": <source_id>,
61 "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
62 "target_path": <target_path>, # override default
65 # anything below here is for non trivial configuration
66 # can be used in conjuction with the above
70 "access_key": <access>,
72 "endpoint": <endpoint>,
73 "host_style" <path | virtual>,
75 "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
77 "type" : <id | email | uri>, # optional, default is id
81 "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
82 # final object name will be target_path + "/" + obj
87 "access_key": <access>,
89 "endpoint": <endpoint>,
93 "id": <id>, # acl mappings
95 "type": <id | email | uri>,
103 "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
104 "target_path": <dest>, # (override default)
105 "connection_id": <connection_id>, # optional, if empty references default connection
106 "acls_id": <mappings_id>, # optional, if empty references default mappings
110 target path optional variables:
113 sid: sync instance id, randomly generated by sync process on first sync initalization
114 zonegroup: zonegroup name
115 zonegroup_id: zonegroup name
119 (evaluated when syncing)
126 ACLGranteeTypeEnum type
{ACL_TYPE_CANON_USER
};
130 ACLMapping() = default;
132 ACLMapping(ACLGranteeTypeEnum t
,
134 const string
& d
) : type(t
),
138 void init(const JSONFormattable
& config
) {
139 const string
& t
= config
["type"];
142 type
= ACL_TYPE_EMAIL_USER
;
143 } else if (t
== "uri") {
144 type
= ACL_TYPE_GROUP
;
146 type
= ACL_TYPE_CANON_USER
;
149 source_id
= config
["source_id"];
150 dest_id
= config
["dest_id"];
153 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
154 Formatter::ObjectSection
os(jf
, "acl_mapping");
157 case ACL_TYPE_EMAIL_USER
:
167 encode_json("type", s
, &jf
);
168 encode_json("source_id", source_id
, &jf
);
169 encode_json("dest_id", dest_id
, &jf
);
174 map
<string
, ACLMapping
> acl_mappings
;
176 void init(const JSONFormattable
& config
) {
177 for (auto& c
: config
.array()) {
181 acl_mappings
.emplace(std::make_pair(m
.source_id
, m
));
184 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
185 Formatter::ArraySection
os(jf
, "acls");
187 for (auto& i
: acl_mappings
) {
188 i
.second
.dump_conf(cct
, jf
);
193 struct AWSSyncConfig_ACLProfiles
{
194 map
<string
, std::shared_ptr
<ACLMappings
> > acl_profiles
;
196 void init(const JSONFormattable
& config
) {
197 for (auto& c
: config
.array()) {
198 const string
& profile_id
= c
["id"];
200 std::shared_ptr
<ACLMappings
> ap
{new ACLMappings
};
203 acl_profiles
[profile_id
] = ap
;
206 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
207 Formatter::ArraySection
section(jf
, "acl_profiles");
209 for (auto& p
: acl_profiles
) {
210 Formatter::ObjectSection
section(jf
, "profile");
211 encode_json("id", p
.first
, &jf
);
212 p
.second
->dump_conf(cct
, jf
);
216 bool find(const string
& profile_id
, ACLMappings
*result
) const {
217 auto iter
= acl_profiles
.find(profile_id
);
218 if (iter
== acl_profiles
.end()) {
221 *result
= *iter
->second
;
226 struct AWSSyncConfig_Connection
{
227 string connection_id
;
230 std::optional
<string
> region
;
231 HostStyle host_style
{PathStyle
};
233 bool has_endpoint
{false};
235 bool has_host_style
{false};
237 void init(const JSONFormattable
& config
) {
238 has_endpoint
= config
.exists("endpoint");
239 has_key
= config
.exists("access_key") || config
.exists("secret");
240 has_host_style
= config
.exists("host_style");
242 connection_id
= config
["id"];
243 endpoint
= config
["endpoint"];
245 key
= RGWAccessKey(config
["access_key"], config
["secret"]);
247 if (config
.exists("region")) {
248 region
= config
["region"];
253 string host_style_str
= config
["host_style"];
254 if (host_style_str
!= "virtual") {
255 host_style
= PathStyle
;
257 host_style
= VirtualStyle
;
260 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
261 Formatter::ObjectSection
section(jf
, "connection");
262 encode_json("id", connection_id
, &jf
);
263 encode_json("endpoint", endpoint
, &jf
);
264 string s
= (host_style
== PathStyle
? "path" : "virtual");
265 encode_json("region", region
, &jf
);
266 encode_json("host_style", s
, &jf
);
269 Formatter::ObjectSection
os(jf
, "key");
270 encode_json("access_key", key
.id
, &jf
);
271 string secret
= (key
.key
.empty() ? "" : "******");
272 encode_json("secret", secret
, &jf
);
277 static int conf_to_uint64(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, const string
& key
, uint64_t *pval
)
280 if (config
.find(key
, &sval
)) {
282 uint64_t val
= strict_strtoll(sval
.c_str(), 10, &err
);
284 ldpp_dout(dpp
, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key
<< ": " << sval
<< dendl
;
292 struct AWSSyncConfig_S3
{
293 uint64_t multipart_sync_threshold
{DEFAULT_MULTIPART_SYNC_PART_SIZE
};
294 uint64_t multipart_min_part_size
{DEFAULT_MULTIPART_SYNC_PART_SIZE
};
296 int init(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
) {
297 int r
= conf_to_uint64(dpp
, cct
, config
, "multipart_sync_threshold", &multipart_sync_threshold
);
302 r
= conf_to_uint64(dpp
, cct
, config
, "multipart_min_part_size", &multipart_min_part_size
);
306 #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
307 if (multipart_min_part_size
< MULTIPART_MIN_POSSIBLE_PART_SIZE
) {
308 multipart_min_part_size
= MULTIPART_MIN_POSSIBLE_PART_SIZE
;
313 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
314 Formatter::ObjectSection
section(jf
, "s3");
315 encode_json("multipart_sync_threshold", multipart_sync_threshold
, &jf
);
316 encode_json("multipart_min_part_size", multipart_min_part_size
, &jf
);
320 struct AWSSyncConfig_Profile
{
321 string source_bucket
;
324 string connection_id
;
327 std::shared_ptr
<AWSSyncConfig_Connection
> conn_conf
;
328 std::shared_ptr
<ACLMappings
> acls
;
330 std::shared_ptr
<RGWRESTConn
> conn
;
332 void init(const JSONFormattable
& config
) {
333 source_bucket
= config
["source_bucket"];
335 prefix
= (!source_bucket
.empty() && source_bucket
[source_bucket
.size() - 1] == '*');
338 source_bucket
= source_bucket
.substr(0, source_bucket
.size() - 1);
341 target_path
= config
["target_path"];
342 connection_id
= config
["connection_id"];
343 acls_id
= config
["acls_id"];
345 if (config
.exists("connection")) {
346 conn_conf
= make_shared
<AWSSyncConfig_Connection
>();
347 conn_conf
->init(config
["connection"]);
350 if (config
.exists("acls")) {
351 acls
= make_shared
<ACLMappings
>();
352 acls
->init(config
["acls"]);
356 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
, const char *section
= "config") const {
357 Formatter::ObjectSection
config(jf
, section
);
358 string sb
{source_bucket
};
362 encode_json("source_bucket", sb
, &jf
);
363 encode_json("target_path", target_path
, &jf
);
364 encode_json("connection_id", connection_id
, &jf
);
365 encode_json("acls_id", acls_id
, &jf
);
366 if (conn_conf
.get()) {
367 conn_conf
->dump_conf(cct
, jf
);
370 acls
->dump_conf(cct
, jf
);
375 static void find_and_replace(const string
& src
, const string
& find
, const string
& replace
, string
*dest
)
379 size_t pos
= s
.find(find
);
380 while (pos
!= string::npos
) {
381 size_t next_ofs
= pos
+ find
.size();
382 s
= s
.substr(0, pos
) + replace
+ s
.substr(next_ofs
);
383 pos
= s
.find(find
, next_ofs
);
389 static void apply_meta_param(const string
& src
, const string
& param
, const string
& val
, string
*dest
)
391 string s
= string("${") + param
+ "}";
392 find_and_replace(src
, s
, val
, dest
);
396 struct AWSSyncConfig
{
397 AWSSyncConfig_Profile default_profile
;
398 std::shared_ptr
<AWSSyncConfig_Profile
> root_profile
;
400 map
<string
, std::shared_ptr
<AWSSyncConfig_Connection
> > connections
;
401 AWSSyncConfig_ACLProfiles acl_profiles
;
403 map
<string
, std::shared_ptr
<AWSSyncConfig_Profile
> > explicit_profiles
;
407 int init_profile(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& profile_conf
, AWSSyncConfig_Profile
& profile
,
408 bool connection_must_exist
) {
409 if (!profile
.connection_id
.empty()) {
410 if (profile
.conn_conf
) {
411 ldpp_dout(dpp
, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile
.connection_id
<< dendl
;
414 if (connections
.find(profile
.connection_id
) == connections
.end()) {
415 ldpp_dout(dpp
, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile
.connection_id
<< dendl
;
418 profile
.conn_conf
= connections
[profile
.connection_id
];
419 } else if (!profile
.conn_conf
) {
420 profile
.connection_id
= default_profile
.connection_id
;
421 auto i
= connections
.find(profile
.connection_id
);
422 if (i
!= connections
.end()) {
423 profile
.conn_conf
= i
->second
;
427 if (connection_must_exist
&& !profile
.conn_conf
) {
428 ldpp_dout(dpp
, 0) << "ERROR: remote connection undefined for sync profile" << dendl
;
432 if (profile
.conn_conf
&& default_profile
.conn_conf
) {
433 if (!profile
.conn_conf
->has_endpoint
) {
434 profile
.conn_conf
->endpoint
= default_profile
.conn_conf
->endpoint
;
436 if (!profile
.conn_conf
->has_host_style
) {
437 profile
.conn_conf
->host_style
= default_profile
.conn_conf
->host_style
;
439 if (!profile
.conn_conf
->has_key
) {
440 profile
.conn_conf
->key
= default_profile
.conn_conf
->key
;
444 ACLMappings acl_mappings
;
446 if (!profile
.acls_id
.empty()) {
447 if (!acl_profiles
.find(profile
.acls_id
, &acl_mappings
)) {
448 ldpp_dout(dpp
, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile
.acls_id
<< dendl
;
451 profile
.acls
= acl_profiles
.acl_profiles
[profile
.acls_id
];
452 } else if (!profile
.acls
) {
453 if (default_profile
.acls
) {
454 profile
.acls
= default_profile
.acls
;
455 profile
.acls_id
= default_profile
.acls_id
;
459 if (profile
.target_path
.empty()) {
460 profile
.target_path
= default_profile
.target_path
;
462 if (profile
.target_path
.empty()) {
463 profile
.target_path
= default_target_path
;
469 int init_target(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& profile_conf
, std::shared_ptr
<AWSSyncConfig_Profile
> *ptarget
) {
470 std::shared_ptr
<AWSSyncConfig_Profile
> profile
;
471 profile
.reset(new AWSSyncConfig_Profile
);
472 profile
->init(profile_conf
);
474 int ret
= init_profile(dpp
, cct
, profile_conf
, *profile
, true);
479 auto& sb
= profile
->source_bucket
;
481 if (explicit_profiles
.find(sb
) != explicit_profiles
.end()) {
482 ldpp_dout(dpp
, 0) << "WARNING: duplicate target configuration in sync module" << dendl
;
485 explicit_profiles
[sb
] = profile
;
492 bool do_find_profile(const rgw_bucket bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *result
) {
493 const string
& name
= bucket
.name
;
494 auto iter
= explicit_profiles
.upper_bound(name
);
495 if (iter
== explicit_profiles
.begin()) {
500 if (iter
->first
.size() > name
.size()) {
503 if (name
.compare(0, iter
->first
.size(), iter
->first
) != 0) {
507 std::shared_ptr
<AWSSyncConfig_Profile
>& target
= iter
->second
;
509 if (!target
->prefix
&&
510 name
.size() != iter
->first
.size()) {
518 void find_profile(const rgw_bucket bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *result
) {
519 if (!do_find_profile(bucket
, result
)) {
520 *result
= root_profile
;
526 int init(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
) {
527 auto& default_conf
= config
["default"];
529 if (config
.exists("default")) {
530 default_profile
.init(default_conf
);
531 init_profile(dpp
, cct
, default_conf
, default_profile
, false);
534 for (auto& conn
: config
["connections"].array()) {
535 auto new_conn
= conn
;
537 std::shared_ptr
<AWSSyncConfig_Connection
> c
{new AWSSyncConfig_Connection
};
540 connections
[new_conn
["id"]] = c
;
543 acl_profiles
.init(config
["acl_profiles"]);
545 int r
= s3
.init(dpp
, cct
, config
["s3"]);
550 auto new_root_conf
= config
;
552 r
= init_target(dpp
, cct
, new_root_conf
, &root_profile
); /* the root profile config */
557 for (auto target_conf
: config
["profiles"].array()) {
558 int r
= init_target(dpp
, cct
, target_conf
, nullptr);
564 JSONFormatter
jf(true);
569 ldpp_dout(dpp
, 5) << "sync module config (parsed representation):\n" << ss
.str() << dendl
;
574 void expand_target(RGWDataSyncCtx
*sc
, const string
& sid
, const string
& path
, string
*dest
) {
575 apply_meta_param(path
, "sid", sid
, dest
);
577 const RGWZoneGroup
& zg
= sc
->env
->svc
->zone
->get_zonegroup();
578 apply_meta_param(path
, "zonegroup", zg
.get_name(), dest
);
579 apply_meta_param(path
, "zonegroup_id", zg
.get_id(), dest
);
581 const RGWZone
& zone
= sc
->env
->svc
->zone
->get_zone();
582 apply_meta_param(path
, "zone", zone
.name
, dest
);
583 apply_meta_param(path
, "zone_id", zone
.id
, dest
);
586 void update_config(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, const string
& sid
) {
587 expand_target(sc
, sid
, root_profile
->target_path
, &root_profile
->target_path
);
588 ldpp_dout(dpp
, 20) << "updated target: (root) -> " << root_profile
->target_path
<< dendl
;
589 for (auto& t
: explicit_profiles
) {
590 expand_target(sc
, sid
, t
.second
->target_path
, &t
.second
->target_path
);
591 ldpp_dout(dpp
, 20) << "updated target: " << t
.first
<< " -> " << t
.second
->target_path
<< dendl
;
595 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
596 Formatter::ObjectSection
config(jf
, "config");
597 root_profile
->dump_conf(cct
, jf
);
598 jf
.open_array_section("connections");
599 for (auto c
: connections
) {
600 c
.second
->dump_conf(cct
, jf
);
604 acl_profiles
.dump_conf(cct
, jf
);
607 Formatter::ArraySection
as(jf
, "profiles");
608 for (auto& t
: explicit_profiles
) {
609 Formatter::ObjectSection
target_section(jf
, "profile");
610 encode_json("name", t
.first
, &jf
);
611 t
.second
->dump_conf(cct
, jf
);
616 string
get_path(std::shared_ptr
<AWSSyncConfig_Profile
>& profile
,
617 const RGWBucketInfo
& bucket_info
,
618 const rgw_obj_key
& obj
) {
621 if (!bucket_info
.owner
.tenant
.empty()) {
622 bucket_str
= owner
= bucket_info
.owner
.tenant
+ "-";
623 owner
+= bucket_info
.owner
.id
;
625 bucket_str
+= bucket_info
.bucket
.name
;
627 const string
& path
= profile
->target_path
;
630 apply_meta_param(path
, "bucket", bucket_str
, &new_path
);
631 apply_meta_param(new_path
, "owner", owner
, &new_path
);
633 new_path
+= string("/") + get_key_oid(obj
);
638 void get_target(std::shared_ptr
<AWSSyncConfig_Profile
>& profile
,
639 const RGWBucketInfo
& bucket_info
,
640 const rgw_obj_key
& obj
,
643 string path
= get_path(profile
, bucket_info
, obj
);
644 size_t pos
= path
.find('/');
646 *bucket_name
= path
.substr(0, pos
);
647 *obj_name
= path
.substr(pos
+ 1);
650 void init_conns(RGWDataSyncCtx
*sc
, const string
& id
) {
651 auto sync_env
= sc
->env
;
653 update_config(sync_env
->dpp
, sc
, id
);
655 auto& root_conf
= root_profile
->conn_conf
;
657 root_profile
->conn
.reset(new S3RESTConn(sc
->cct
,
660 { root_conf
->endpoint
},
663 root_conf
->host_style
));
665 for (auto i
: explicit_profiles
) {
668 c
->conn
.reset(new S3RESTConn(sc
->cct
,
671 { c
->conn_conf
->endpoint
},
673 c
->conn_conf
->region
,
674 c
->conn_conf
->host_style
));
680 struct AWSSyncInstanceEnv
{
684 explicit AWSSyncInstanceEnv(AWSSyncConfig
& _conf
) : conf(_conf
) {}
686 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) {
688 snprintf(buf
, sizeof(buf
), "%llx", (unsigned long long)instance_id
);
691 conf
.init_conns(sc
, id
);
694 void get_profile(const rgw_bucket
& bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *ptarget
) {
695 conf
.find_profile(bucket
, ptarget
);
696 ceph_assert(ptarget
);
700 static int do_decode_rest_obj(const DoutPrefixProvider
*dpp
, CephContext
*cct
, map
<string
, bufferlist
>& attrs
, map
<string
, string
>& headers
, rgw_rest_obj
*info
)
702 for (auto header
: headers
) {
703 const string
& val
= header
.second
;
704 if (header
.first
== "RGWX_OBJECT_SIZE") {
705 info
->content_len
= atoi(val
.c_str());
707 info
->attrs
[header
.first
] = val
;
711 info
->acls
.set_ctx(cct
);
712 auto aiter
= attrs
.find(RGW_ATTR_ACL
);
713 if (aiter
!= attrs
.end()) {
714 bufferlist
& bl
= aiter
->second
;
715 auto bliter
= bl
.cbegin();
717 info
->acls
.decode(bliter
);
718 } catch (buffer::error
& err
) {
719 ldpp_dout(dpp
, 0) << "ERROR: failed to decode policy off attrs" << dendl
;
723 ldpp_dout(dpp
, 0) << "WARNING: acl attrs not provided" << dendl
;
729 class RGWRESTStreamGetCRF
: public RGWStreamReadHTTPResourceCRF
733 rgw::sal::Object
* src_obj
;
734 RGWRESTConn::get_obj_params req_params
;
736 rgw_sync_aws_src_obj_properties src_properties
;
738 RGWRESTStreamGetCRF(CephContext
*_cct
,
739 RGWCoroutinesEnv
*_env
,
740 RGWCoroutine
*_caller
,
743 rgw::sal::Object
* _src_obj
,
744 const rgw_sync_aws_src_obj_properties
& _src_properties
) : RGWStreamReadHTTPResourceCRF(_cct
, _env
, _caller
,
745 _sc
->env
->http_manager
, _src_obj
->get_key()),
746 sc(_sc
), conn(_conn
), src_obj(_src_obj
),
747 src_properties(_src_properties
) {
750 int init(const DoutPrefixProvider
*dpp
) override
{
751 /* init input connection */
754 req_params
.get_op
= true;
755 req_params
.prepend_metadata
= true;
757 req_params
.unmod_ptr
= &src_properties
.mtime
;
758 req_params
.etag
= src_properties
.etag
;
759 req_params
.mod_zone_id
= src_properties
.zone_short_id
;
760 req_params
.mod_pg_ver
= src_properties
.pg_ver
;
763 req_params
.range_is_set
= true;
764 req_params
.range_start
= range
.ofs
;
765 req_params
.range_end
= range
.ofs
+ range
.size
- 1;
768 RGWRESTStreamRWRequest
*in_req
;
769 int ret
= conn
->get_obj(dpp
, src_obj
, req_params
, false /* send */, &in_req
);
771 ldpp_dout(dpp
, 0) << "ERROR: " << __func__
<< "(): conn->get_obj() returned ret=" << ret
<< dendl
;
777 return RGWStreamReadHTTPResourceCRF::init(dpp
);
780 int decode_rest_obj(const DoutPrefixProvider
*dpp
, map
<string
, string
>& headers
, bufferlist
& extra_data
) override
{
781 map
<string
, bufferlist
> src_attrs
;
783 ldpp_dout(dpp
, 20) << __func__
<< ":" << " headers=" << headers
<< " extra_data.length()=" << extra_data
.length() << dendl
;
785 if (extra_data
.length() > 0) {
787 if (!jp
.parse(extra_data
.c_str(), extra_data
.length())) {
788 ldpp_dout(dpp
, 0) << "ERROR: failed to parse response extra data. len=" << extra_data
.length() << " data=" << extra_data
.c_str() << dendl
;
792 JSONDecoder::decode_json("attrs", src_attrs
, &jp
);
794 return do_decode_rest_obj(dpp
, sc
->cct
, src_attrs
, headers
, &rest_obj
);
797 bool need_extra_data() override
{
802 static std::set
<string
> keep_headers
= { "CONTENT_TYPE",
804 "CONTENT_DISPOSITION",
805 "CONTENT_LANGUAGE" };
807 class RGWAWSStreamPutCRF
: public RGWStreamWriteHTTPResourceCRF
810 rgw_sync_aws_src_obj_properties src_properties
;
811 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
812 rgw::sal::Object
* dest_obj
;
815 RGWAWSStreamPutCRF(CephContext
*_cct
,
816 RGWCoroutinesEnv
*_env
,
817 RGWCoroutine
*_caller
,
819 const rgw_sync_aws_src_obj_properties
& _src_properties
,
820 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
821 rgw::sal::Object
* _dest_obj
) : RGWStreamWriteHTTPResourceCRF(_cct
, _env
, _caller
, _sc
->env
->http_manager
),
822 sc(_sc
), src_properties(_src_properties
), target(_target
), dest_obj(_dest_obj
) {
825 int init() override
{
826 /* init output connection */
827 RGWRESTStreamS3PutObj
*out_req
{nullptr};
829 if (multipart
.is_multipart
) {
831 snprintf(buf
, sizeof(buf
), "%d", multipart
.part_num
);
832 rgw_http_param_pair params
[] = { { "uploadId", multipart
.upload_id
.c_str() },
833 { "partNumber", buf
},
834 { nullptr, nullptr } };
835 target
->conn
->put_obj_send_init(dest_obj
, params
, &out_req
);
837 target
->conn
->put_obj_send_init(dest_obj
, nullptr, &out_req
);
842 return RGWStreamWriteHTTPResourceCRF::init();
845 static bool keep_attr(const string
& h
) {
846 return (keep_headers
.find(h
) != keep_headers
.end() ||
847 boost::algorithm::starts_with(h
, "X_AMZ_"));
850 static void init_send_attrs(const DoutPrefixProvider
*dpp
,
852 const rgw_rest_obj
& rest_obj
,
853 const rgw_sync_aws_src_obj_properties
& src_properties
,
854 const AWSSyncConfig_Profile
*target
,
855 map
<string
, string
> *attrs
) {
856 auto& new_attrs
= *attrs
;
860 for (auto& hi
: rest_obj
.attrs
) {
861 if (keep_attr(hi
.first
)) {
862 new_attrs
.insert(hi
);
866 auto acl
= rest_obj
.acls
.get_acl();
868 map
<int, vector
<string
> > access_map
;
871 for (auto& grant
: acl
.get_grant_map()) {
872 auto& orig_grantee
= grant
.first
;
873 auto& perm
= grant
.second
;
877 const auto& am
= target
->acls
->acl_mappings
;
879 auto iter
= am
.find(orig_grantee
);
880 if (iter
== am
.end()) {
881 ldpp_dout(dpp
, 20) << "acl_mappings: Could not find " << orig_grantee
<< " .. ignoring" << dendl
;
885 grantee
= iter
->second
.dest_id
;
889 switch (iter
->second
.type
) {
890 case ACL_TYPE_CANON_USER
:
893 case ACL_TYPE_EMAIL_USER
:
894 type
= "emailAddress";
903 string tv
= type
+ "=" + grantee
;
905 int flags
= perm
.get_permission().get_permissions();
906 if ((flags
& RGW_PERM_FULL_CONTROL
) == RGW_PERM_FULL_CONTROL
) {
907 access_map
[flags
].push_back(tv
);
911 for (int i
= 1; i
<= RGW_PERM_WRITE_ACP
; i
<<= 1) {
913 access_map
[i
].push_back(tv
);
919 for (auto aiter
: access_map
) {
920 int grant_type
= aiter
.first
;
922 string
header_str("x-amz-grant-");
924 switch (grant_type
) {
926 header_str
.append("read");
929 header_str
.append("write");
931 case RGW_PERM_READ_ACP
:
932 header_str
.append("read-acp");
934 case RGW_PERM_WRITE_ACP
:
935 header_str
.append("write-acp");
937 case RGW_PERM_FULL_CONTROL
:
938 header_str
.append("full-control");
944 for (auto viter
: aiter
.second
) {
951 ldpp_dout(dpp
, 20) << "acl_mappings: set acl: " << header_str
<< "=" << s
<< dendl
;
953 new_attrs
[header_str
] = s
;
957 snprintf(buf
, sizeof(buf
), "%llu", (long long)src_properties
.versioned_epoch
);
958 new_attrs
["x-amz-meta-rgwx-versioned-epoch"] = buf
;
960 utime_t
ut(src_properties
.mtime
);
961 snprintf(buf
, sizeof(buf
), "%lld.%09lld",
963 (long long)ut
.nsec());
965 new_attrs
["x-amz-meta-rgwx-source-mtime"] = buf
;
966 new_attrs
["x-amz-meta-rgwx-source-etag"] = src_properties
.etag
;
967 new_attrs
["x-amz-meta-rgwx-source-key"] = rest_obj
.key
.name
;
968 if (!rest_obj
.key
.instance
.empty()) {
969 new_attrs
["x-amz-meta-rgwx-source-version-id"] = rest_obj
.key
.instance
;
973 void send_ready(const DoutPrefixProvider
*dpp
, const rgw_rest_obj
& rest_obj
) override
{
974 RGWRESTStreamS3PutObj
*r
= static_cast<RGWRESTStreamS3PutObj
*>(req
);
976 map
<string
, string
> new_attrs
;
977 if (!multipart
.is_multipart
) {
978 init_send_attrs(dpp
, sc
->cct
, rest_obj
, src_properties
, target
.get(), &new_attrs
);
981 r
->set_send_length(rest_obj
.content_len
);
983 RGWAccessControlPolicy policy
;
985 r
->send_ready(dpp
, target
->conn
->get_key(), new_attrs
, policy
);
988 void handle_headers(const map
<string
, string
>& headers
) {
989 for (auto h
: headers
) {
990 if (h
.first
== "ETAG") {
996 bool get_etag(string
*petag
) {
1006 class RGWAWSStreamObjToCloudPlainCR
: public RGWCoroutine
{
1008 RGWRESTConn
*source_conn
;
1009 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1010 rgw::sal::Object
* src_obj
;
1011 rgw::sal::Object
* dest_obj
;
1013 rgw_sync_aws_src_obj_properties src_properties
;
1015 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
1016 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
1019 RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx
*_sc
,
1020 RGWRESTConn
*_source_conn
,
1021 rgw::sal::Object
* _src_obj
,
1022 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1023 std::shared_ptr
<AWSSyncConfig_Profile
> _target
,
1024 rgw::sal::Object
* _dest_obj
) : RGWCoroutine(_sc
->cct
),
1026 source_conn(_source_conn
),
1029 dest_obj(_dest_obj
),
1030 src_properties(_src_properties
) {}
1032 int operate(const DoutPrefixProvider
*dpp
) override
{
1035 in_crf
.reset(new RGWRESTStreamGetCRF(cct
, get_env(), this, sc
,
1036 source_conn
, src_obj
,
1040 out_crf
.reset(new RGWAWSStreamPutCRF(cct
, get_env(), this, sc
,
1041 src_properties
, target
, dest_obj
));
1043 yield
call(new RGWStreamSpliceCR(cct
, sc
->env
->http_manager
, in_crf
, out_crf
));
1045 return set_cr_error(retcode
);
1048 return set_cr_done();
1055 class RGWAWSStreamObjToCloudMultipartPartCR
: public RGWCoroutine
{
1057 RGWRESTConn
*source_conn
;
1058 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1059 rgw::sal::Object
* src_obj
;
1060 rgw::sal::Object
* dest_obj
;
1062 rgw_sync_aws_src_obj_properties src_properties
;
1066 rgw_sync_aws_multipart_part_info part_info
;
1068 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
1069 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
1074 RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx
*_sc
,
1075 RGWRESTConn
*_source_conn
,
1076 rgw::sal::Object
* _src_obj
,
1077 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
1078 rgw::sal::Object
* _dest_obj
,
1079 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1080 const string
& _upload_id
,
1081 const rgw_sync_aws_multipart_part_info
& _part_info
,
1082 string
*_petag
) : RGWCoroutine(_sc
->cct
),
1084 source_conn(_source_conn
),
1087 dest_obj(_dest_obj
),
1088 src_properties(_src_properties
),
1089 upload_id(_upload_id
),
1090 part_info(_part_info
),
1093 int operate(const DoutPrefixProvider
*dpp
) override
{
1096 in_crf
.reset(new RGWRESTStreamGetCRF(cct
, get_env(), this, sc
,
1097 source_conn
, src_obj
,
1100 in_crf
->set_range(part_info
.ofs
, part_info
.size
);
1103 out_crf
.reset(new RGWAWSStreamPutCRF(cct
, get_env(), this, sc
,
1104 src_properties
, target
, dest_obj
));
1106 out_crf
->set_multipart(upload_id
, part_info
.part_num
, part_info
.size
);
1108 yield
call(new RGWStreamSpliceCR(cct
, sc
->env
->http_manager
, in_crf
, out_crf
));
1110 return set_cr_error(retcode
);
1113 if (!(static_cast<RGWAWSStreamPutCRF
*>(out_crf
.get()))->get_etag(petag
)) {
1114 ldpp_dout(dpp
, 0) << "ERROR: failed to get etag from PUT request" << dendl
;
1115 return set_cr_error(-EIO
);
1118 return set_cr_done();
1125 class RGWAWSAbortMultipartCR
: public RGWCoroutine
{
1127 RGWRESTConn
*dest_conn
;
1128 rgw::sal::Object
* dest_obj
;
1133 RGWAWSAbortMultipartCR(RGWDataSyncCtx
*_sc
,
1134 RGWRESTConn
*_dest_conn
,
1135 rgw::sal::Object
* _dest_obj
,
1136 const string
& _upload_id
) : RGWCoroutine(_sc
->cct
),
1138 dest_conn(_dest_conn
),
1139 dest_obj(_dest_obj
),
1140 upload_id(_upload_id
) {}
1142 int operate(const DoutPrefixProvider
*dpp
) override
{
1146 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
1148 call(new RGWDeleteRESTResourceCR(sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1149 obj_to_aws_path(dest_obj
), params
));
1153 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj
<< " (retcode=" << retcode
<< ")" << dendl
;
1154 return set_cr_error(retcode
);
1157 return set_cr_done();
1164 class RGWAWSInitMultipartCR
: public RGWCoroutine
{
1166 RGWRESTConn
*dest_conn
;
1167 rgw::sal::Object
* dest_obj
;
1170 map
<string
, string
> attrs
;
1176 struct InitMultipartResult
{
1181 void decode_xml(XMLObj
*obj
) {
1182 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
1183 RGWXMLDecoder::decode_xml("Key", key
, obj
);
1184 RGWXMLDecoder::decode_xml("UploadId", upload_id
, obj
);
1189 RGWAWSInitMultipartCR(RGWDataSyncCtx
*_sc
,
1190 RGWRESTConn
*_dest_conn
,
1191 rgw::sal::Object
* _dest_obj
,
1193 const map
<string
, string
>& _attrs
,
1194 string
*_upload_id
) : RGWCoroutine(_sc
->cct
),
1196 dest_conn(_dest_conn
),
1197 dest_obj(_dest_obj
),
1198 obj_size(_obj_size
),
1200 upload_id(_upload_id
) {}
1202 int operate(const DoutPrefixProvider
*dpp
) override
{
1206 rgw_http_param_pair params
[] = { { "uploads", nullptr }, {nullptr, nullptr} };
1208 call(new RGWPostRawRESTResourceCR
<bufferlist
> (sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1209 obj_to_aws_path(dest_obj
), params
, &attrs
, bl
, &out_bl
));
1213 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
1214 return set_cr_error(retcode
);
1218 * If one of the following fails we cannot abort upload, as we cannot
1219 * extract the upload id. If one of these fail it's very likely that that's
1220 * the least of our problem.
1222 RGWXMLDecoder::XMLParser parser
;
1223 if (!parser
.init()) {
1224 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1225 return set_cr_error(-EIO
);
1228 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1229 string
str(out_bl
.c_str(), out_bl
.length());
1230 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1231 return set_cr_error(-EIO
);
1235 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result
, &parser
, true);
1236 } catch (RGWXMLDecoder::err
& err
) {
1237 string
str(out_bl
.c_str(), out_bl
.length());
1238 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1239 return set_cr_error(-EIO
);
1243 ldpp_dout(dpp
, 20) << "init multipart result: bucket=" << result
.bucket
<< " key=" << result
.key
<< " upload_id=" << result
.upload_id
<< dendl
;
1245 *upload_id
= result
.upload_id
;
1247 return set_cr_done();
1254 class RGWAWSCompleteMultipartCR
: public RGWCoroutine
{
1256 RGWRESTConn
*dest_conn
;
1257 rgw::sal::Object
* dest_obj
;
1263 struct CompleteMultipartReq
{
1264 map
<int, rgw_sync_aws_multipart_part_info
> parts
;
1266 explicit CompleteMultipartReq(const map
<int, rgw_sync_aws_multipart_part_info
>& _parts
) : parts(_parts
) {}
1268 void dump_xml(Formatter
*f
) const {
1269 for (auto p
: parts
) {
1270 f
->open_object_section("Part");
1271 encode_xml("PartNumber", p
.first
, f
);
1272 encode_xml("ETag", p
.second
.etag
, f
);
1278 struct CompleteMultipartResult
{
1284 void decode_xml(XMLObj
*obj
) {
1285 RGWXMLDecoder::decode_xml("Location", bucket
, obj
);
1286 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
1287 RGWXMLDecoder::decode_xml("Key", key
, obj
);
1288 RGWXMLDecoder::decode_xml("ETag", etag
, obj
);
1293 RGWAWSCompleteMultipartCR(RGWDataSyncCtx
*_sc
,
1294 RGWRESTConn
*_dest_conn
,
1295 rgw::sal::Object
* _dest_obj
,
1297 const map
<int, rgw_sync_aws_multipart_part_info
>& _parts
) : RGWCoroutine(_sc
->cct
),
1299 dest_conn(_dest_conn
),
1300 dest_obj(_dest_obj
),
1301 upload_id(_upload_id
),
1304 int operate(const DoutPrefixProvider
*dpp
) override
{
1308 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
1310 XMLFormatter formatter
;
1312 encode_xml("CompleteMultipartUpload", req_enc
, &formatter
);
1314 formatter
.flush(ss
);
1317 bl
.append(ss
.str());
1319 call(new RGWPostRawRESTResourceCR
<bufferlist
> (sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1320 obj_to_aws_path(dest_obj
), params
, nullptr, bl
, &out_bl
));
1324 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
1325 return set_cr_error(retcode
);
1329 * If one of the following fails we cannot abort upload, as we cannot
1330 * extract the upload id. If one of these fail it's very likely that that's
1331 * the least of our problem.
1333 RGWXMLDecoder::XMLParser parser
;
1334 if (!parser
.init()) {
1335 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1336 return set_cr_error(-EIO
);
1339 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1340 string
str(out_bl
.c_str(), out_bl
.length());
1341 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1342 return set_cr_error(-EIO
);
1346 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result
, &parser
, true);
1347 } catch (RGWXMLDecoder::err
& err
) {
1348 string
str(out_bl
.c_str(), out_bl
.length());
1349 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1350 return set_cr_error(-EIO
);
1354 ldpp_dout(dpp
, 20) << "complete multipart result: location=" << result
.location
<< " bucket=" << result
.bucket
<< " key=" << result
.key
<< " etag=" << result
.etag
<< dendl
;
1356 return set_cr_done();
1364 class RGWAWSStreamAbortMultipartUploadCR
: public RGWCoroutine
{
1366 RGWRESTConn
*dest_conn
;
1367 rgw::sal::Object
* dest_obj
;
1368 const rgw_raw_obj status_obj
;
1374 RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx
*_sc
,
1375 RGWRESTConn
*_dest_conn
,
1376 rgw::sal::Object
* _dest_obj
,
1377 const rgw_raw_obj
& _status_obj
,
1378 const string
& _upload_id
) : RGWCoroutine(_sc
->cct
), sc(_sc
),
1379 dest_conn(_dest_conn
),
1380 dest_obj(_dest_obj
),
1381 status_obj(_status_obj
),
1382 upload_id(_upload_id
) {}
1384 int operate(const DoutPrefixProvider
*dpp
) override
{
1386 yield
call(new RGWAWSAbortMultipartCR(sc
, dest_conn
, dest_obj
, upload_id
));
1388 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj
<< " upload_id=" << upload_id
<< " retcode=" << retcode
<< dendl
;
1389 /* ignore error, best effort */
1391 yield
call(new RGWRadosRemoveCR(sc
->env
->store
, status_obj
));
1393 ldpp_dout(dpp
, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj
<< " retcode=" << retcode
<< dendl
;
1394 /* ignore error, best effort */
1396 return set_cr_done();
1403 class RGWAWSStreamObjToCloudMultipartCR
: public RGWCoroutine
{
1405 RGWDataSyncEnv
*sync_env
;
1406 AWSSyncConfig
& conf
;
1407 RGWRESTConn
*source_conn
;
1408 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1409 rgw::sal::Object
* src_obj
;
1410 rgw::sal::Object
* dest_obj
;
1414 rgw_sync_aws_src_obj_properties src_properties
;
1415 rgw_rest_obj rest_obj
;
1417 rgw_sync_aws_multipart_upload_info status
;
1419 map
<string
, string
> new_attrs
;
1421 rgw_sync_aws_multipart_part_info
*pcur_part_info
{nullptr};
1425 rgw_raw_obj status_obj
;
1428 RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx
*_sc
,
1429 rgw_bucket_sync_pipe
& _sync_pipe
,
1430 AWSSyncConfig
& _conf
,
1431 RGWRESTConn
*_source_conn
,
1432 rgw::sal::Object
* _src_obj
,
1433 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
1434 rgw::sal::Object
* _dest_obj
,
1436 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1437 const rgw_rest_obj
& _rest_obj
) : RGWCoroutine(_sc
->cct
),
1441 source_conn(_source_conn
),
1444 dest_obj(_dest_obj
),
1445 obj_size(_obj_size
),
1446 src_properties(_src_properties
),
1447 rest_obj(_rest_obj
),
1448 status_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
,
1449 RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe
, sc
->source_zone
, src_obj
)) {
1453 int operate(const DoutPrefixProvider
*dpp
) override
{
1455 yield
call(new RGWSimpleRadosReadCR
<rgw_sync_aws_multipart_upload_info
>(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
1456 status_obj
, &status
, false));
1458 if (retcode
< 0 && retcode
!= -ENOENT
) {
1459 ldpp_dout(dpp
, 0) << "ERROR: failed to read sync status of object " << src_obj
<< " retcode=" << retcode
<< dendl
;
1464 /* check here that mtime and size did not change */
1466 if (status
.src_properties
.mtime
!= src_properties
.mtime
|| status
.obj_size
!= obj_size
||
1467 status
.src_properties
.etag
!= src_properties
.etag
) {
1468 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1473 if (retcode
== -ENOENT
) {
1474 RGWAWSStreamPutCRF::init_send_attrs(dpp
, sc
->cct
, rest_obj
, src_properties
, target
.get(), &new_attrs
);
1476 yield
call(new RGWAWSInitMultipartCR(sc
, target
->conn
.get(), dest_obj
, status
.obj_size
, std::move(new_attrs
), &status
.upload_id
));
1478 return set_cr_error(retcode
);
1481 status
.obj_size
= obj_size
;
1482 status
.src_properties
= src_properties
;
1483 #define MULTIPART_MAX_PARTS 10000
1484 uint64_t min_part_size
= obj_size
/ MULTIPART_MAX_PARTS
;
1485 status
.part_size
= std::max(conf
.s3
.multipart_min_part_size
, min_part_size
);
1486 status
.num_parts
= (obj_size
+ status
.part_size
- 1) / status
.part_size
;
1487 status
.cur_part
= 1;
1490 for (; (uint32_t)status
.cur_part
<= status
.num_parts
; ++status
.cur_part
) {
1492 rgw_sync_aws_multipart_part_info
& cur_part_info
= status
.parts
[status
.cur_part
];
1493 cur_part_info
.part_num
= status
.cur_part
;
1494 cur_part_info
.ofs
= status
.cur_ofs
;
1495 cur_part_info
.size
= std::min((uint64_t)status
.part_size
, status
.obj_size
- status
.cur_ofs
);
1497 pcur_part_info
= &cur_part_info
;
1499 status
.cur_ofs
+= status
.part_size
;
1501 call(new RGWAWSStreamObjToCloudMultipartPartCR(sc
,
1502 source_conn
, src_obj
,
1505 status
.src_properties
,
1508 &cur_part_info
.etag
));
1512 ldpp_dout(dpp
, 0) << "ERROR: failed to sync obj=" << src_obj
<< ", sync via multipart upload, upload_id=" << status
.upload_id
<< " part number " << status
.cur_part
<< " (error: " << cpp_strerror(-retcode
) << ")" << dendl
;
1514 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1515 return set_cr_error(ret_err
);
1518 yield
call(new RGWSimpleRadosWriteCR
<rgw_sync_aws_multipart_upload_info
>(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
, status_obj
, status
));
1520 ldpp_dout(dpp
, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode
<< dendl
;
1521 /* continue with upload anyway */
1523 ldpp_dout(dpp
, 20) << "sync of object=" << src_obj
<< " via multipart upload, finished sending part #" << status
.cur_part
<< " etag=" << pcur_part_info
->etag
<< dendl
;
1526 yield
call(new RGWAWSCompleteMultipartCR(sc
, target
->conn
.get(), dest_obj
, status
.upload_id
, status
.parts
));
1528 ldpp_dout(dpp
, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj
<< " (error: " << cpp_strerror(-retcode
) << ")" << dendl
;
1530 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1531 return set_cr_error(ret_err
);
1534 /* remove status obj */
1535 yield
call(new RGWRadosRemoveCR(sync_env
->store
, status_obj
));
1537 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj
<< " upload_id=" << status
.upload_id
<< " part number " << status
.cur_part
<< " (" << cpp_strerror(-retcode
) << ")" << dendl
;
1538 /* ignore error, best effort */
1540 return set_cr_done();
1547 int decode_attr(map
<string
, bufferlist
>& attrs
, const char *attr_name
, T
*result
, T def_val
)
1549 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
1550 if (iter
== attrs
.end()) {
1554 bufferlist
& bl
= iter
->second
;
1555 if (bl
.length() == 0) {
1559 auto bliter
= bl
.cbegin();
1561 decode(*result
, bliter
);
1562 } catch (buffer::error
& err
) {
1568 // maybe use Fetch Remote Obj instead?
1569 class RGWAWSHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
1570 rgw_bucket_sync_pipe sync_pipe
;
1571 AWSSyncInstanceEnv
& instance
;
1573 uint64_t versioned_epoch
{0};
1575 RGWRESTConn
*source_conn
{nullptr};
1576 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1578 unordered_map
<string
, bool> bucket_created
;
1579 string target_bucket_name
;
1580 string target_obj_name
;
1581 rgw_rest_obj rest_obj
;
1584 uint32_t src_zone_short_id
{0};
1585 uint64_t src_pg_ver
{0};
1589 struct CreateBucketResult
{
1592 void decode_xml(XMLObj
*obj
) {
1593 RGWXMLDecoder::decode_xml("Code", code
, obj
);
1598 RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1599 rgw_bucket_sync_pipe
& _sync_pipe
,
1601 AWSSyncInstanceEnv
& _instance
,
1602 uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1603 sync_pipe(_sync_pipe
),
1604 instance(_instance
), versioned_epoch(_versioned_epoch
)
1607 ~RGWAWSHandleRemoteObjCBCR(){
1610 int operate(const DoutPrefixProvider
*dpp
) override
{
1612 ret
= decode_attr(attrs
, RGW_ATTR_PG_VER
, &src_pg_ver
, (uint64_t)0);
1614 ldpp_dout(dpp
, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl
;
1616 ret
= decode_attr(attrs
, RGW_ATTR_SOURCE_ZONE
, &src_zone_short_id
, (uint32_t)0);
1618 ldpp_dout(dpp
, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl
;
1619 src_pg_ver
= 0; /* all or nothing */
1622 ldpp_dout(dpp
, 4) << "AWS: download begin: z=" << sc
->source_zone
1623 << " b=" << src_bucket
<< " k=" << key
<< " size=" << size
1624 << " mtime=" << mtime
<< " etag=" << etag
1625 << " zone_short_id=" << src_zone_short_id
<< " pg_ver=" << src_pg_ver
1628 source_conn
= sync_env
->svc
->zone
->get_zone_conn(sc
->source_zone
);
1630 ldpp_dout(dpp
, 0) << "ERROR: cannot find http connection to zone " << sc
->source_zone
<< dendl
;
1631 return set_cr_error(-EINVAL
);
1634 instance
.get_profile(sync_pipe
.info
.source_bs
.bucket
, &target
);
1635 instance
.conf
.get_target(target
, sync_pipe
.dest_bucket_info
, key
, &target_bucket_name
, &target_obj_name
);
1637 if (bucket_created
.find(target_bucket_name
) == bucket_created
.end()){
1639 ldpp_dout(dpp
, 0) << "AWS: creating bucket " << target_bucket_name
<< dendl
;
1641 call(new RGWPutRawRESTResourceCR
<bufferlist
> (sc
->cct
, target
->conn
.get(),
1642 sync_env
->http_manager
,
1643 target_bucket_name
, nullptr, bl
, &out_bl
));
1646 RGWXMLDecoder::XMLParser parser
;
1647 if (!parser
.init()) {
1648 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1649 return set_cr_error(retcode
);
1652 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1653 string
str(out_bl
.c_str(), out_bl
.length());
1654 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1655 return set_cr_error(retcode
);
1659 RGWXMLDecoder::decode_xml("Error", result
, &parser
, true);
1660 } catch (RGWXMLDecoder::err
& err
) {
1661 string
str(out_bl
.c_str(), out_bl
.length());
1662 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1663 return set_cr_error(retcode
);
1666 if (result
.code
!= "BucketAlreadyOwnedByYou") {
1667 return set_cr_error(retcode
);
1671 bucket_created
[target_bucket_name
] = true;
1675 rgw::sal::RadosBucket
bucket(sync_env
->store
, src_bucket
);
1676 rgw::sal::RadosObject
src_obj(sync_env
->store
, key
, &bucket
);
1679 rgw_bucket target_bucket
;
1680 target_bucket
.name
= target_bucket_name
; /* this is only possible because we only use bucket name for
1682 rgw::sal::RadosBucket
dest_bucket(sync_env
->store
, target_bucket
);
1683 rgw::sal::RadosObject
dest_obj(sync_env
->store
, rgw_obj_key(target_obj_name
), &dest_bucket
);
1686 rgw_sync_aws_src_obj_properties src_properties
;
1687 src_properties
.mtime
= mtime
;
1688 src_properties
.etag
= etag
;
1689 src_properties
.zone_short_id
= src_zone_short_id
;
1690 src_properties
.pg_ver
= src_pg_ver
;
1691 src_properties
.versioned_epoch
= versioned_epoch
;
1693 if (size
< instance
.conf
.s3
.multipart_sync_threshold
) {
1694 call(new RGWAWSStreamObjToCloudPlainCR(sc
, source_conn
, &src_obj
,
1699 rgw_rest_obj rest_obj
;
1701 if (do_decode_rest_obj(dpp
, sc
->cct
, attrs
, headers
, &rest_obj
)) {
1702 ldpp_dout(dpp
, 0) << "ERROR: failed to decode rest obj out of headers=" << headers
<< ", attrs=" << attrs
<< dendl
;
1703 return set_cr_error(-EINVAL
);
1705 call(new RGWAWSStreamObjToCloudMultipartCR(sc
, sync_pipe
, instance
.conf
, source_conn
, &src_obj
,
1706 target
, &dest_obj
, size
, src_properties
, rest_obj
));
1710 return set_cr_error(retcode
);
1713 return set_cr_done();
1720 class RGWAWSHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
1721 rgw_bucket_sync_pipe sync_pipe
;
1722 AWSSyncInstanceEnv
& instance
;
1723 uint64_t versioned_epoch
;
1725 RGWAWSHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
1726 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1727 AWSSyncInstanceEnv
& _instance
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1728 sync_pipe(_sync_pipe
),
1729 instance(_instance
), versioned_epoch(_versioned_epoch
) {
1732 ~RGWAWSHandleRemoteObjCR() {}
1734 RGWStatRemoteObjCBCR
*allocate_callback() override
{
1735 return new RGWAWSHandleRemoteObjCBCR(sc
, sync_pipe
, key
, instance
, versioned_epoch
);
1739 class RGWAWSRemoveRemoteObjCBCR
: public RGWCoroutine
{
1741 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1742 rgw_bucket_sync_pipe sync_pipe
;
1744 ceph::real_time mtime
;
1745 AWSSyncInstanceEnv
& instance
;
1748 RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1749 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
1750 AWSSyncInstanceEnv
& _instance
) : RGWCoroutine(_sc
->cct
), sc(_sc
),
1751 sync_pipe(_sync_pipe
), key(_key
),
1752 mtime(_mtime
), instance(_instance
) {}
1753 int operate(const DoutPrefixProvider
*dpp
) override
{
1755 ldpp_dout(dpp
, 0) << ": remove remote obj: z=" << sc
->source_zone
1756 << " b=" <<sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
1758 instance
.get_profile(sync_pipe
.info
.source_bs
.bucket
, &target
);
1759 string path
= instance
.conf
.get_path(target
, sync_pipe
.dest_bucket_info
, key
);
1760 ldpp_dout(dpp
, 0) << "AWS: removing aws object at" << path
<< dendl
;
1762 call(new RGWDeleteRESTResourceCR(sc
->cct
, target
->conn
.get(),
1763 sc
->env
->http_manager
,
1764 path
, nullptr /* params */));
1767 return set_cr_error(retcode
);
1769 return set_cr_done();
1777 class RGWAWSDataSyncModule
: public RGWDataSyncModule
{
1779 AWSSyncInstanceEnv instance
;
1781 RGWAWSDataSyncModule(CephContext
*_cct
, AWSSyncConfig
& _conf
) :
1786 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
1787 instance
.init(sc
, instance_id
);
1790 ~RGWAWSDataSyncModule() {}
1792 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
1793 std::optional
<uint64_t> versioned_epoch
,
1794 rgw_zone_set
*zones_trace
) override
{
1795 ldout(sc
->cct
, 0) << instance
.id
<< ": sync_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1796 return new RGWAWSHandleRemoteObjCR(sc
, sync_pipe
, key
, instance
, versioned_epoch
.value_or(0));
1798 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
,
1799 rgw_zone_set
*zones_trace
) override
{
1800 ldout(sc
->cct
, 0) <<"rm_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1801 return new RGWAWSRemoveRemoteObjCBCR(sc
, sync_pipe
, key
, mtime
, instance
);
1803 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
1804 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
,
1805 rgw_zone_set
*zones_trace
) override
{
1806 ldout(sc
->cct
, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
1807 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1812 class RGWAWSSyncModuleInstance
: public RGWSyncModuleInstance
{
1813 RGWAWSDataSyncModule data_handler
;
1815 RGWAWSSyncModuleInstance(CephContext
*cct
, AWSSyncConfig
& _conf
) : data_handler(cct
, _conf
) {}
1816 RGWDataSyncModule
*get_data_handler() override
{
1817 return &data_handler
;
1821 int RGWAWSSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
){
1824 int r
= conf
.init(dpp
, cct
, config
);
1829 instance
->reset(new RGWAWSSyncModuleInstance(cct
, conf
));