1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/errno.h"
6 #include "rgw_common.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_sync_module.h"
9 #include "rgw_data_sync.h"
10 #include "rgw_sync_module_aws.h"
11 #include "rgw_cr_rados.h"
12 #include "rgw_rest_conn.h"
13 #include "rgw_cr_rest.h"
17 #include "services/svc_zone.h"
19 #include <boost/asio/yield.hpp>
21 #define dout_subsys ceph_subsys_rgw
24 #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
26 static string default_target_path
= "rgw-${zonegroup}-${sid}/${bucket}";
28 static string
get_key_oid(const rgw_obj_key
& key
)
30 string oid
= key
.name
;
31 if (!key
.instance
.empty() &&
32 !key
.have_null_instance()) {
33 oid
+= string(":") + key
.instance
;
38 static string
obj_to_aws_path(const rgw_obj
& obj
)
40 string path
= obj
.bucket
.name
+ "/" + get_key_oid(obj
.key
);
48 json configuration definition:
52 "access_key": <access>,
54 "endpoint": <endpoint>,
55 "host_style": <path | virtual>,
57 "acls": [ { "type": <id | email | uri>,
58 "source_id": <source_id>,
59 "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
60 "target_path": <target_path>, # override default
63 # anything below here is for non trivial configuration
64 # can be used in conjuction with the above
68 "access_key": <access>,
70 "endpoint": <endpoint>,
71 "host_style" <path | virtual>,
73 "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
75 "type" : <id | email | uri>, # optional, default is id
79 "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
80 # final object name will be target_path + "/" + obj
85 "access_key": <access>,
87 "endpoint": <endpoint>,
91 "id": <id>, # acl mappings
93 "type": <id | email | uri>,
101 "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
102 "target_path": <dest>, # (override default)
103 "connection_id": <connection_id>, # optional, if empty references default connection
104 "acls_id": <mappings_id>, # optional, if empty references default mappings
108 target path optional variables:
111 sid: sync instance id, randomly generated by sync process on first sync initalization
112 zonegroup: zonegroup name
113 zonegroup_id: zonegroup name
117 (evaluated when syncing)
124 ACLGranteeTypeEnum type
{ACL_TYPE_CANON_USER
};
128 ACLMapping() = default;
130 ACLMapping(ACLGranteeTypeEnum t
,
132 const string
& d
) : type(t
),
136 void init(const JSONFormattable
& config
) {
137 const string
& t
= config
["type"];
140 type
= ACL_TYPE_EMAIL_USER
;
141 } else if (t
== "uri") {
142 type
= ACL_TYPE_GROUP
;
144 type
= ACL_TYPE_CANON_USER
;
147 source_id
= config
["source_id"];
148 dest_id
= config
["dest_id"];
151 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
152 Formatter::ObjectSection
os(jf
, "acl_mapping");
155 case ACL_TYPE_EMAIL_USER
:
165 encode_json("type", s
, &jf
);
166 encode_json("source_id", source_id
, &jf
);
167 encode_json("dest_id", dest_id
, &jf
);
172 map
<string
, ACLMapping
> acl_mappings
;
174 void init(const JSONFormattable
& config
) {
175 for (auto& c
: config
.array()) {
179 acl_mappings
.emplace(std::make_pair(m
.source_id
, m
));
182 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
183 Formatter::ArraySection
os(jf
, "acls");
185 for (auto& i
: acl_mappings
) {
186 i
.second
.dump_conf(cct
, jf
);
191 struct AWSSyncConfig_ACLProfiles
{
192 map
<string
, std::shared_ptr
<ACLMappings
> > acl_profiles
;
194 void init(const JSONFormattable
& config
) {
195 for (auto& c
: config
.array()) {
196 const string
& profile_id
= c
["id"];
198 std::shared_ptr
<ACLMappings
> ap
{new ACLMappings
};
201 acl_profiles
[profile_id
] = ap
;
204 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
205 Formatter::ArraySection
section(jf
, "acl_profiles");
207 for (auto& p
: acl_profiles
) {
208 Formatter::ObjectSection
section(jf
, "profile");
209 encode_json("id", p
.first
, &jf
);
210 p
.second
->dump_conf(cct
, jf
);
214 bool find(const string
& profile_id
, ACLMappings
*result
) const {
215 auto iter
= acl_profiles
.find(profile_id
);
216 if (iter
== acl_profiles
.end()) {
219 *result
= *iter
->second
;
224 struct AWSSyncConfig_Connection
{
225 string connection_id
;
228 HostStyle host_style
{PathStyle
};
230 bool has_endpoint
{false};
232 bool has_host_style
{false};
234 void init(const JSONFormattable
& config
) {
235 has_endpoint
= config
.exists("endpoint");
236 has_key
= config
.exists("access_key") || config
.exists("secret");
237 has_host_style
= config
.exists("host_style");
239 connection_id
= config
["id"];
240 endpoint
= config
["endpoint"];
242 key
= RGWAccessKey(config
["access_key"], config
["secret"]);
243 string host_style_str
= config
["host_style"];
244 if (host_style_str
!= "virtual") {
245 host_style
= PathStyle
;
247 host_style
= VirtualStyle
;
250 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
251 Formatter::ObjectSection
section(jf
, "connection");
252 encode_json("id", connection_id
, &jf
);
253 encode_json("endpoint", endpoint
, &jf
);
254 string s
= (host_style
== PathStyle
? "path" : "virtual");
255 encode_json("host_style", s
, &jf
);
258 Formatter::ObjectSection
os(jf
, "key");
259 encode_json("access_key", key
.id
, &jf
);
260 string secret
= (key
.key
.empty() ? "" : "******");
261 encode_json("secret", secret
, &jf
);
266 static int conf_to_uint64(CephContext
*cct
, const JSONFormattable
& config
, const string
& key
, uint64_t *pval
)
269 if (config
.find(key
, &sval
)) {
271 uint64_t val
= strict_strtoll(sval
.c_str(), 10, &err
);
273 ldout(cct
, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key
<< ": " << sval
<< dendl
;
281 struct AWSSyncConfig_S3
{
282 uint64_t multipart_sync_threshold
{DEFAULT_MULTIPART_SYNC_PART_SIZE
};
283 uint64_t multipart_min_part_size
{DEFAULT_MULTIPART_SYNC_PART_SIZE
};
285 int init(CephContext
*cct
, const JSONFormattable
& config
) {
286 int r
= conf_to_uint64(cct
, config
, "multipart_sync_threshold", &multipart_sync_threshold
);
291 r
= conf_to_uint64(cct
, config
, "multipart_min_part_size", &multipart_min_part_size
);
295 #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
296 if (multipart_min_part_size
< MULTIPART_MIN_POSSIBLE_PART_SIZE
) {
297 multipart_min_part_size
= MULTIPART_MIN_POSSIBLE_PART_SIZE
;
302 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
303 Formatter::ObjectSection
section(jf
, "s3");
304 encode_json("multipart_sync_threshold", multipart_sync_threshold
, &jf
);
305 encode_json("multipart_min_part_size", multipart_min_part_size
, &jf
);
309 struct AWSSyncConfig_Profile
{
310 string source_bucket
;
313 string connection_id
;
316 std::shared_ptr
<AWSSyncConfig_Connection
> conn_conf
;
317 std::shared_ptr
<ACLMappings
> acls
;
319 std::shared_ptr
<RGWRESTConn
> conn
;
321 void init(const JSONFormattable
& config
) {
322 source_bucket
= config
["source_bucket"];
324 prefix
= (!source_bucket
.empty() && source_bucket
[source_bucket
.size() - 1] == '*');
327 source_bucket
= source_bucket
.substr(0, source_bucket
.size() - 1);
330 target_path
= config
["target_path"];
331 connection_id
= config
["connection_id"];
332 acls_id
= config
["acls_id"];
334 if (config
.exists("connection")) {
335 conn_conf
= make_shared
<AWSSyncConfig_Connection
>();
336 conn_conf
->init(config
["connection"]);
339 if (config
.exists("acls")) {
340 acls
= make_shared
<ACLMappings
>();
341 acls
->init(config
["acls"]);
345 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
, const char *section
= "config") const {
346 Formatter::ObjectSection
config(jf
, section
);
347 string sb
{source_bucket
};
351 encode_json("source_bucket", sb
, &jf
);
352 encode_json("target_path", target_path
, &jf
);
353 encode_json("connection_id", connection_id
, &jf
);
354 encode_json("acls_id", acls_id
, &jf
);
355 if (conn_conf
.get()) {
356 conn_conf
->dump_conf(cct
, jf
);
359 acls
->dump_conf(cct
, jf
);
364 static void find_and_replace(const string
& src
, const string
& find
, const string
& replace
, string
*dest
)
368 size_t pos
= s
.find(find
);
369 while (pos
!= string::npos
) {
370 size_t next_ofs
= pos
+ find
.size();
371 s
= s
.substr(0, pos
) + replace
+ s
.substr(next_ofs
);
372 pos
= s
.find(find
, next_ofs
);
378 static void apply_meta_param(const string
& src
, const string
& param
, const string
& val
, string
*dest
)
380 string s
= string("${") + param
+ "}";
381 find_and_replace(src
, s
, val
, dest
);
385 struct AWSSyncConfig
{
386 AWSSyncConfig_Profile default_profile
;
387 std::shared_ptr
<AWSSyncConfig_Profile
> root_profile
;
389 map
<string
, std::shared_ptr
<AWSSyncConfig_Connection
> > connections
;
390 AWSSyncConfig_ACLProfiles acl_profiles
;
392 map
<string
, std::shared_ptr
<AWSSyncConfig_Profile
> > explicit_profiles
;
396 int init_profile(CephContext
*cct
, const JSONFormattable
& profile_conf
, AWSSyncConfig_Profile
& profile
,
397 bool connection_must_exist
) {
398 if (!profile
.connection_id
.empty()) {
399 if (profile
.conn_conf
) {
400 ldout(cct
, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile
.connection_id
<< dendl
;
403 if (connections
.find(profile
.connection_id
) == connections
.end()) {
404 ldout(cct
, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile
.connection_id
<< dendl
;
407 profile
.conn_conf
= connections
[profile
.connection_id
];
408 } else if (!profile
.conn_conf
) {
409 profile
.connection_id
= default_profile
.connection_id
;
410 auto i
= connections
.find(profile
.connection_id
);
411 if (i
!= connections
.end()) {
412 profile
.conn_conf
= i
->second
;
416 if (connection_must_exist
&& !profile
.conn_conf
) {
417 ldout(cct
, 0) << "ERROR: remote connection undefined for sync profile" << dendl
;
421 if (profile
.conn_conf
&& default_profile
.conn_conf
) {
422 if (!profile
.conn_conf
->has_endpoint
) {
423 profile
.conn_conf
->endpoint
= default_profile
.conn_conf
->endpoint
;
425 if (!profile
.conn_conf
->has_host_style
) {
426 profile
.conn_conf
->host_style
= default_profile
.conn_conf
->host_style
;
428 if (!profile
.conn_conf
->has_key
) {
429 profile
.conn_conf
->key
= default_profile
.conn_conf
->key
;
433 ACLMappings acl_mappings
;
435 if (!profile
.acls_id
.empty()) {
436 if (!acl_profiles
.find(profile
.acls_id
, &acl_mappings
)) {
437 ldout(cct
, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile
.acls_id
<< dendl
;
440 profile
.acls
= acl_profiles
.acl_profiles
[profile
.acls_id
];
441 } else if (!profile
.acls
) {
442 if (default_profile
.acls
) {
443 profile
.acls
= default_profile
.acls
;
444 profile
.acls_id
= default_profile
.acls_id
;
448 if (profile
.target_path
.empty()) {
449 profile
.target_path
= default_profile
.target_path
;
451 if (profile
.target_path
.empty()) {
452 profile
.target_path
= default_target_path
;
458 int init_target(CephContext
*cct
, const JSONFormattable
& profile_conf
, std::shared_ptr
<AWSSyncConfig_Profile
> *ptarget
) {
459 std::shared_ptr
<AWSSyncConfig_Profile
> profile
;
460 profile
.reset(new AWSSyncConfig_Profile
);
461 profile
->init(profile_conf
);
463 int ret
= init_profile(cct
, profile_conf
, *profile
, true);
468 auto& sb
= profile
->source_bucket
;
470 if (explicit_profiles
.find(sb
) != explicit_profiles
.end()) {
471 ldout(cct
, 0) << "WARNING: duplicate target configuration in sync module" << dendl
;
474 explicit_profiles
[sb
] = profile
;
481 bool do_find_profile(const rgw_bucket bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *result
) {
482 const string
& name
= bucket
.name
;
483 auto iter
= explicit_profiles
.upper_bound(name
);
484 if (iter
== explicit_profiles
.begin()) {
489 if (iter
->first
.size() > name
.size()) {
492 if (name
.compare(0, iter
->first
.size(), iter
->first
) != 0) {
496 std::shared_ptr
<AWSSyncConfig_Profile
>& target
= iter
->second
;
498 if (!target
->prefix
&&
499 name
.size() != iter
->first
.size()) {
507 void find_profile(const rgw_bucket bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *result
) {
508 if (!do_find_profile(bucket
, result
)) {
509 *result
= root_profile
;
515 int init(CephContext
*cct
, const JSONFormattable
& config
) {
516 auto& default_conf
= config
["default"];
518 if (config
.exists("default")) {
519 default_profile
.init(default_conf
);
520 init_profile(cct
, default_conf
, default_profile
, false);
523 for (auto& conn
: config
["connections"].array()) {
524 auto new_conn
= conn
;
526 std::shared_ptr
<AWSSyncConfig_Connection
> c
{new AWSSyncConfig_Connection
};
529 connections
[new_conn
["id"]] = c
;
532 acl_profiles
.init(config
["acl_profiles"]);
534 int r
= s3
.init(cct
, config
["s3"]);
539 auto new_root_conf
= config
;
541 r
= init_target(cct
, new_root_conf
, &root_profile
); /* the root profile config */
546 for (auto target_conf
: config
["profiles"].array()) {
547 int r
= init_target(cct
, target_conf
, nullptr);
553 JSONFormatter
jf(true);
558 ldout(cct
, 5) << "sync module config (parsed representation):\n" << ss
.str() << dendl
;
563 void expand_target(RGWDataSyncCtx
*sc
, const string
& sid
, const string
& path
, string
*dest
) {
564 apply_meta_param(path
, "sid", sid
, dest
);
566 const RGWZoneGroup
& zg
= sc
->env
->svc
->zone
->get_zonegroup();
567 apply_meta_param(path
, "zonegroup", zg
.get_name(), dest
);
568 apply_meta_param(path
, "zonegroup_id", zg
.get_id(), dest
);
570 const RGWZone
& zone
= sc
->env
->svc
->zone
->get_zone();
571 apply_meta_param(path
, "zone", zone
.name
, dest
);
572 apply_meta_param(path
, "zone_id", zone
.id
, dest
);
575 void update_config(RGWDataSyncCtx
*sc
, const string
& sid
) {
576 expand_target(sc
, sid
, root_profile
->target_path
, &root_profile
->target_path
);
577 ldout(sc
->cct
, 20) << "updated target: (root) -> " << root_profile
->target_path
<< dendl
;
578 for (auto& t
: explicit_profiles
) {
579 expand_target(sc
, sid
, t
.second
->target_path
, &t
.second
->target_path
);
580 ldout(sc
->cct
, 20) << "updated target: " << t
.first
<< " -> " << t
.second
->target_path
<< dendl
;
584 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
585 Formatter::ObjectSection
config(jf
, "config");
586 root_profile
->dump_conf(cct
, jf
);
587 jf
.open_array_section("connections");
588 for (auto c
: connections
) {
589 c
.second
->dump_conf(cct
, jf
);
593 acl_profiles
.dump_conf(cct
, jf
);
596 Formatter::ArraySection
as(jf
, "profiles");
597 for (auto& t
: explicit_profiles
) {
598 Formatter::ObjectSection
target_section(jf
, "profile");
599 encode_json("name", t
.first
, &jf
);
600 t
.second
->dump_conf(cct
, jf
);
605 string
get_path(std::shared_ptr
<AWSSyncConfig_Profile
>& profile
,
606 const RGWBucketInfo
& bucket_info
,
607 const rgw_obj_key
& obj
) {
610 if (!bucket_info
.owner
.tenant
.empty()) {
611 bucket_str
= owner
= bucket_info
.owner
.tenant
+ "-";
612 owner
+= bucket_info
.owner
.id
;
614 bucket_str
+= bucket_info
.bucket
.name
;
616 const string
& path
= profile
->target_path
;
619 apply_meta_param(path
, "bucket", bucket_str
, &new_path
);
620 apply_meta_param(new_path
, "owner", owner
, &new_path
);
622 new_path
+= string("/") + get_key_oid(obj
);
627 void get_target(std::shared_ptr
<AWSSyncConfig_Profile
>& profile
,
628 const RGWBucketInfo
& bucket_info
,
629 const rgw_obj_key
& obj
,
632 string path
= get_path(profile
, bucket_info
, obj
);
633 size_t pos
= path
.find('/');
635 *bucket_name
= path
.substr(0, pos
);
636 *obj_name
= path
.substr(pos
+ 1);
639 void init_conns(RGWDataSyncCtx
*sc
, const string
& id
) {
640 auto sync_env
= sc
->env
;
642 update_config(sc
, id
);
644 auto& root_conf
= root_profile
->conn_conf
;
646 root_profile
->conn
.reset(new S3RESTConn(sc
->cct
,
649 { root_conf
->endpoint
},
651 root_conf
->host_style
));
653 for (auto i
: explicit_profiles
) {
656 c
->conn
.reset(new S3RESTConn(sc
->cct
,
659 { c
->conn_conf
->endpoint
},
661 c
->conn_conf
->host_style
));
667 struct AWSSyncInstanceEnv
{
671 explicit AWSSyncInstanceEnv(AWSSyncConfig
& _conf
) : conf(_conf
) {}
673 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) {
675 snprintf(buf
, sizeof(buf
), "%llx", (unsigned long long)instance_id
);
678 conf
.init_conns(sc
, id
);
681 void get_profile(const rgw_bucket
& bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *ptarget
) {
682 conf
.find_profile(bucket
, ptarget
);
683 ceph_assert(ptarget
);
687 static int do_decode_rest_obj(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, map
<string
, string
>& headers
, rgw_rest_obj
*info
)
689 for (auto header
: headers
) {
690 const string
& val
= header
.second
;
691 if (header
.first
== "RGWX_OBJECT_SIZE") {
692 info
->content_len
= atoi(val
.c_str());
694 info
->attrs
[header
.first
] = val
;
698 info
->acls
.set_ctx(cct
);
699 auto aiter
= attrs
.find(RGW_ATTR_ACL
);
700 if (aiter
!= attrs
.end()) {
701 bufferlist
& bl
= aiter
->second
;
702 auto bliter
= bl
.cbegin();
704 info
->acls
.decode(bliter
);
705 } catch (buffer::error
& err
) {
706 ldout(cct
, 0) << "ERROR: failed to decode policy off attrs" << dendl
;
710 ldout(cct
, 0) << "WARNING: acl attrs not provided" << dendl
;
716 class RGWRESTStreamGetCRF
: public RGWStreamReadHTTPResourceCRF
721 RGWRESTConn::get_obj_params req_params
;
723 rgw_sync_aws_src_obj_properties src_properties
;
725 RGWRESTStreamGetCRF(CephContext
*_cct
,
726 RGWCoroutinesEnv
*_env
,
727 RGWCoroutine
*_caller
,
731 const rgw_sync_aws_src_obj_properties
& _src_properties
) : RGWStreamReadHTTPResourceCRF(_cct
, _env
, _caller
,
732 _sc
->env
->http_manager
, _src_obj
.key
),
733 sc(_sc
), conn(_conn
), src_obj(_src_obj
),
734 src_properties(_src_properties
) {
737 int init() override
{
738 /* init input connection */
741 req_params
.get_op
= true;
742 req_params
.prepend_metadata
= true;
744 req_params
.unmod_ptr
= &src_properties
.mtime
;
745 req_params
.etag
= src_properties
.etag
;
746 req_params
.mod_zone_id
= src_properties
.zone_short_id
;
747 req_params
.mod_pg_ver
= src_properties
.pg_ver
;
750 req_params
.range_is_set
= true;
751 req_params
.range_start
= range
.ofs
;
752 req_params
.range_end
= range
.ofs
+ range
.size
- 1;
755 RGWRESTStreamRWRequest
*in_req
;
756 int ret
= conn
->get_obj(src_obj
, req_params
, false /* send */, &in_req
);
758 ldout(sc
->cct
, 0) << "ERROR: " << __func__
<< "(): conn->get_obj() returned ret=" << ret
<< dendl
;
764 return RGWStreamReadHTTPResourceCRF::init();
767 int decode_rest_obj(map
<string
, string
>& headers
, bufferlist
& extra_data
) override
{
768 map
<string
, bufferlist
> src_attrs
;
770 ldout(sc
->cct
, 20) << __func__
<< ":" << " headers=" << headers
<< " extra_data.length()=" << extra_data
.length() << dendl
;
772 if (extra_data
.length() > 0) {
774 if (!jp
.parse(extra_data
.c_str(), extra_data
.length())) {
775 ldout(sc
->cct
, 0) << "ERROR: failed to parse response extra data. len=" << extra_data
.length() << " data=" << extra_data
.c_str() << dendl
;
779 JSONDecoder::decode_json("attrs", src_attrs
, &jp
);
781 return do_decode_rest_obj(sc
->cct
, src_attrs
, headers
, &rest_obj
);
784 bool need_extra_data() override
{
789 static std::set
<string
> keep_headers
= { "CONTENT_TYPE",
791 "CONTENT_DISPOSITION",
792 "CONTENT_LANGUAGE" };
794 class RGWAWSStreamPutCRF
: public RGWStreamWriteHTTPResourceCRF
797 rgw_sync_aws_src_obj_properties src_properties
;
798 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
802 RGWAWSStreamPutCRF(CephContext
*_cct
,
803 RGWCoroutinesEnv
*_env
,
804 RGWCoroutine
*_caller
,
806 const rgw_sync_aws_src_obj_properties
& _src_properties
,
807 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
808 rgw_obj
& _dest_obj
) : RGWStreamWriteHTTPResourceCRF(_cct
, _env
, _caller
, _sc
->env
->http_manager
),
809 sc(_sc
), src_properties(_src_properties
), target(_target
), dest_obj(_dest_obj
) {
812 int init() override
{
813 /* init output connection */
814 RGWRESTStreamS3PutObj
*out_req
{nullptr};
816 if (multipart
.is_multipart
) {
818 snprintf(buf
, sizeof(buf
), "%d", multipart
.part_num
);
819 rgw_http_param_pair params
[] = { { "uploadId", multipart
.upload_id
.c_str() },
820 { "partNumber", buf
},
821 { nullptr, nullptr } };
822 target
->conn
->put_obj_send_init(dest_obj
, params
, &out_req
);
824 target
->conn
->put_obj_send_init(dest_obj
, nullptr, &out_req
);
829 return RGWStreamWriteHTTPResourceCRF::init();
832 static bool keep_attr(const string
& h
) {
833 return (keep_headers
.find(h
) != keep_headers
.end() ||
834 boost::algorithm::starts_with(h
, "X_AMZ_"));
837 static void init_send_attrs(CephContext
*cct
,
838 const rgw_rest_obj
& rest_obj
,
839 const rgw_sync_aws_src_obj_properties
& src_properties
,
840 const AWSSyncConfig_Profile
*target
,
841 map
<string
, string
> *attrs
) {
842 auto& new_attrs
= *attrs
;
846 for (auto& hi
: rest_obj
.attrs
) {
847 if (keep_attr(hi
.first
)) {
848 new_attrs
.insert(hi
);
852 auto acl
= rest_obj
.acls
.get_acl();
854 map
<int, vector
<string
> > access_map
;
857 for (auto& grant
: acl
.get_grant_map()) {
858 auto& orig_grantee
= grant
.first
;
859 auto& perm
= grant
.second
;
863 const auto& am
= target
->acls
->acl_mappings
;
865 auto iter
= am
.find(orig_grantee
);
866 if (iter
== am
.end()) {
867 ldout(cct
, 20) << "acl_mappings: Could not find " << orig_grantee
<< " .. ignoring" << dendl
;
871 grantee
= iter
->second
.dest_id
;
875 switch (iter
->second
.type
) {
876 case ACL_TYPE_CANON_USER
:
879 case ACL_TYPE_EMAIL_USER
:
880 type
= "emailAddress";
889 string tv
= type
+ "=" + grantee
;
891 int flags
= perm
.get_permission().get_permissions();
892 if ((flags
& RGW_PERM_FULL_CONTROL
) == RGW_PERM_FULL_CONTROL
) {
893 access_map
[flags
].push_back(tv
);
897 for (int i
= 1; i
<= RGW_PERM_WRITE_ACP
; i
<<= 1) {
899 access_map
[i
].push_back(tv
);
905 for (auto aiter
: access_map
) {
906 int grant_type
= aiter
.first
;
908 string
header_str("x-amz-grant-");
910 switch (grant_type
) {
912 header_str
.append("read");
915 header_str
.append("write");
917 case RGW_PERM_READ_ACP
:
918 header_str
.append("read-acp");
920 case RGW_PERM_WRITE_ACP
:
921 header_str
.append("write-acp");
923 case RGW_PERM_FULL_CONTROL
:
924 header_str
.append("full-control");
930 for (auto viter
: aiter
.second
) {
937 ldout(cct
, 20) << "acl_mappings: set acl: " << header_str
<< "=" << s
<< dendl
;
939 new_attrs
[header_str
] = s
;
943 snprintf(buf
, sizeof(buf
), "%llu", (long long)src_properties
.versioned_epoch
);
944 new_attrs
["x-amz-meta-rgwx-versioned-epoch"] = buf
;
946 utime_t
ut(src_properties
.mtime
);
947 snprintf(buf
, sizeof(buf
), "%lld.%09lld",
949 (long long)ut
.nsec());
951 new_attrs
["x-amz-meta-rgwx-source-mtime"] = buf
;
952 new_attrs
["x-amz-meta-rgwx-source-etag"] = src_properties
.etag
;
953 new_attrs
["x-amz-meta-rgwx-source-key"] = rest_obj
.key
.name
;
954 if (!rest_obj
.key
.instance
.empty()) {
955 new_attrs
["x-amz-meta-rgwx-source-version-id"] = rest_obj
.key
.instance
;
959 void send_ready(const rgw_rest_obj
& rest_obj
) override
{
960 RGWRESTStreamS3PutObj
*r
= static_cast<RGWRESTStreamS3PutObj
*>(req
);
962 map
<string
, string
> new_attrs
;
963 if (!multipart
.is_multipart
) {
964 init_send_attrs(sc
->cct
, rest_obj
, src_properties
, target
.get(), &new_attrs
);
967 r
->set_send_length(rest_obj
.content_len
);
969 RGWAccessControlPolicy policy
;
971 r
->send_ready(target
->conn
->get_key(), new_attrs
, policy
, false);
974 void handle_headers(const map
<string
, string
>& headers
) {
975 for (auto h
: headers
) {
976 if (h
.first
== "ETAG") {
982 bool get_etag(string
*petag
) {
992 class RGWAWSStreamObjToCloudPlainCR
: public RGWCoroutine
{
994 RGWRESTConn
*source_conn
;
995 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
999 rgw_sync_aws_src_obj_properties src_properties
;
1001 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
1002 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
1005 RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx
*_sc
,
1006 RGWRESTConn
*_source_conn
,
1007 const rgw_obj
& _src_obj
,
1008 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1009 std::shared_ptr
<AWSSyncConfig_Profile
> _target
,
1010 const rgw_obj
& _dest_obj
) : RGWCoroutine(_sc
->cct
),
1012 source_conn(_source_conn
),
1015 dest_obj(_dest_obj
),
1016 src_properties(_src_properties
) {}
1018 int operate() override
{
1021 in_crf
.reset(new RGWRESTStreamGetCRF(cct
, get_env(), this, sc
,
1022 source_conn
, src_obj
,
1026 out_crf
.reset(new RGWAWSStreamPutCRF(cct
, get_env(), this, sc
,
1027 src_properties
, target
, dest_obj
));
1029 yield
call(new RGWStreamSpliceCR(cct
, sc
->env
->http_manager
, in_crf
, out_crf
));
1031 return set_cr_error(retcode
);
1034 return set_cr_done();
1041 class RGWAWSStreamObjToCloudMultipartPartCR
: public RGWCoroutine
{
1043 RGWRESTConn
*source_conn
;
1044 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1048 rgw_sync_aws_src_obj_properties src_properties
;
1052 rgw_sync_aws_multipart_part_info part_info
;
1054 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
1055 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
1060 RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx
*_sc
,
1061 RGWRESTConn
*_source_conn
,
1062 const rgw_obj
& _src_obj
,
1063 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
1064 const rgw_obj
& _dest_obj
,
1065 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1066 const string
& _upload_id
,
1067 const rgw_sync_aws_multipart_part_info
& _part_info
,
1068 string
*_petag
) : RGWCoroutine(_sc
->cct
),
1070 source_conn(_source_conn
),
1073 dest_obj(_dest_obj
),
1074 src_properties(_src_properties
),
1075 upload_id(_upload_id
),
1076 part_info(_part_info
),
1079 int operate() override
{
1082 in_crf
.reset(new RGWRESTStreamGetCRF(cct
, get_env(), this, sc
,
1083 source_conn
, src_obj
,
1086 in_crf
->set_range(part_info
.ofs
, part_info
.size
);
1089 out_crf
.reset(new RGWAWSStreamPutCRF(cct
, get_env(), this, sc
,
1090 src_properties
, target
, dest_obj
));
1092 out_crf
->set_multipart(upload_id
, part_info
.part_num
, part_info
.size
);
1094 yield
call(new RGWStreamSpliceCR(cct
, sc
->env
->http_manager
, in_crf
, out_crf
));
1096 return set_cr_error(retcode
);
1099 if (!(static_cast<RGWAWSStreamPutCRF
*>(out_crf
.get()))->get_etag(petag
)) {
1100 ldout(sc
->cct
, 0) << "ERROR: failed to get etag from PUT request" << dendl
;
1101 return set_cr_error(-EIO
);
1104 return set_cr_done();
1111 class RGWAWSAbortMultipartCR
: public RGWCoroutine
{
1113 RGWRESTConn
*dest_conn
;
1119 RGWAWSAbortMultipartCR(RGWDataSyncCtx
*_sc
,
1120 RGWRESTConn
*_dest_conn
,
1121 const rgw_obj
& _dest_obj
,
1122 const string
& _upload_id
) : RGWCoroutine(_sc
->cct
),
1124 dest_conn(_dest_conn
),
1125 dest_obj(_dest_obj
),
1126 upload_id(_upload_id
) {}
1128 int operate() override
{
1132 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
1134 call(new RGWDeleteRESTResourceCR(sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1135 obj_to_aws_path(dest_obj
), params
));
1139 ldout(sc
->cct
, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj
<< " (retcode=" << retcode
<< ")" << dendl
;
1140 return set_cr_error(retcode
);
1143 return set_cr_done();
1150 class RGWAWSInitMultipartCR
: public RGWCoroutine
{
1152 RGWRESTConn
*dest_conn
;
1156 map
<string
, string
> attrs
;
1162 struct InitMultipartResult
{
1167 void decode_xml(XMLObj
*obj
) {
1168 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
1169 RGWXMLDecoder::decode_xml("Key", key
, obj
);
1170 RGWXMLDecoder::decode_xml("UploadId", upload_id
, obj
);
1175 RGWAWSInitMultipartCR(RGWDataSyncCtx
*_sc
,
1176 RGWRESTConn
*_dest_conn
,
1177 const rgw_obj
& _dest_obj
,
1179 const map
<string
, string
>& _attrs
,
1180 string
*_upload_id
) : RGWCoroutine(_sc
->cct
),
1182 dest_conn(_dest_conn
),
1183 dest_obj(_dest_obj
),
1184 obj_size(_obj_size
),
1186 upload_id(_upload_id
) {}
1188 int operate() override
{
1192 rgw_http_param_pair params
[] = { { "uploads", nullptr }, {nullptr, nullptr} };
1194 call(new RGWPostRawRESTResourceCR
<bufferlist
> (sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1195 obj_to_aws_path(dest_obj
), params
, &attrs
, bl
, &out_bl
));
1199 ldout(sc
->cct
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
1200 return set_cr_error(retcode
);
1204 * If one of the following fails we cannot abort upload, as we cannot
1205 * extract the upload id. If one of these fail it's very likely that that's
1206 * the least of our problem.
1208 RGWXMLDecoder::XMLParser parser
;
1209 if (!parser
.init()) {
1210 ldout(sc
->cct
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1211 return set_cr_error(-EIO
);
1214 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1215 string
str(out_bl
.c_str(), out_bl
.length());
1216 ldout(sc
->cct
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1217 return set_cr_error(-EIO
);
1221 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result
, &parser
, true);
1222 } catch (RGWXMLDecoder::err
& err
) {
1223 string
str(out_bl
.c_str(), out_bl
.length());
1224 ldout(sc
->cct
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1225 return set_cr_error(-EIO
);
1229 ldout(sc
->cct
, 20) << "init multipart result: bucket=" << result
.bucket
<< " key=" << result
.key
<< " upload_id=" << result
.upload_id
<< dendl
;
1231 *upload_id
= result
.upload_id
;
1233 return set_cr_done();
1240 class RGWAWSCompleteMultipartCR
: public RGWCoroutine
{
1242 RGWRESTConn
*dest_conn
;
1249 struct CompleteMultipartReq
{
1250 map
<int, rgw_sync_aws_multipart_part_info
> parts
;
1252 explicit CompleteMultipartReq(const map
<int, rgw_sync_aws_multipart_part_info
>& _parts
) : parts(_parts
) {}
1254 void dump_xml(Formatter
*f
) const {
1255 for (auto p
: parts
) {
1256 f
->open_object_section("Part");
1257 encode_xml("PartNumber", p
.first
, f
);
1258 encode_xml("ETag", p
.second
.etag
, f
);
1264 struct CompleteMultipartResult
{
1270 void decode_xml(XMLObj
*obj
) {
1271 RGWXMLDecoder::decode_xml("Location", bucket
, obj
);
1272 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
1273 RGWXMLDecoder::decode_xml("Key", key
, obj
);
1274 RGWXMLDecoder::decode_xml("ETag", etag
, obj
);
1279 RGWAWSCompleteMultipartCR(RGWDataSyncCtx
*_sc
,
1280 RGWRESTConn
*_dest_conn
,
1281 const rgw_obj
& _dest_obj
,
1283 const map
<int, rgw_sync_aws_multipart_part_info
>& _parts
) : RGWCoroutine(_sc
->cct
),
1285 dest_conn(_dest_conn
),
1286 dest_obj(_dest_obj
),
1287 upload_id(_upload_id
),
1290 int operate() override
{
1294 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
1296 XMLFormatter formatter
;
1298 encode_xml("CompleteMultipartUpload", req_enc
, &formatter
);
1300 formatter
.flush(ss
);
1303 bl
.append(ss
.str());
1305 call(new RGWPostRawRESTResourceCR
<bufferlist
> (sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1306 obj_to_aws_path(dest_obj
), params
, nullptr, bl
, &out_bl
));
1310 ldout(sc
->cct
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
1311 return set_cr_error(retcode
);
1315 * If one of the following fails we cannot abort upload, as we cannot
1316 * extract the upload id. If one of these fail it's very likely that that's
1317 * the least of our problem.
1319 RGWXMLDecoder::XMLParser parser
;
1320 if (!parser
.init()) {
1321 ldout(sc
->cct
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1322 return set_cr_error(-EIO
);
1325 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1326 string
str(out_bl
.c_str(), out_bl
.length());
1327 ldout(sc
->cct
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1328 return set_cr_error(-EIO
);
1332 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result
, &parser
, true);
1333 } catch (RGWXMLDecoder::err
& err
) {
1334 string
str(out_bl
.c_str(), out_bl
.length());
1335 ldout(sc
->cct
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1336 return set_cr_error(-EIO
);
1340 ldout(sc
->cct
, 20) << "complete multipart result: location=" << result
.location
<< " bucket=" << result
.bucket
<< " key=" << result
.key
<< " etag=" << result
.etag
<< dendl
;
1342 return set_cr_done();
1350 class RGWAWSStreamAbortMultipartUploadCR
: public RGWCoroutine
{
1352 RGWRESTConn
*dest_conn
;
1353 const rgw_obj dest_obj
;
1354 const rgw_raw_obj status_obj
;
1360 RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx
*_sc
,
1361 RGWRESTConn
*_dest_conn
,
1362 const rgw_obj
& _dest_obj
,
1363 const rgw_raw_obj
& _status_obj
,
1364 const string
& _upload_id
) : RGWCoroutine(_sc
->cct
), sc(_sc
),
1365 dest_conn(_dest_conn
),
1366 dest_obj(_dest_obj
),
1367 status_obj(_status_obj
),
1368 upload_id(_upload_id
) {}
1370 int operate() override
{
1372 yield
call(new RGWAWSAbortMultipartCR(sc
, dest_conn
, dest_obj
, upload_id
));
1374 ldout(sc
->cct
, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj
<< " upload_id=" << upload_id
<< " retcode=" << retcode
<< dendl
;
1375 /* ignore error, best effort */
1377 yield
call(new RGWRadosRemoveCR(sc
->env
->store
, status_obj
));
1379 ldout(sc
->cct
, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj
<< " retcode=" << retcode
<< dendl
;
1380 /* ignore error, best effort */
1382 return set_cr_done();
1389 class RGWAWSStreamObjToCloudMultipartCR
: public RGWCoroutine
{
1391 RGWDataSyncEnv
*sync_env
;
1392 AWSSyncConfig
& conf
;
1393 RGWRESTConn
*source_conn
;
1394 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1400 rgw_sync_aws_src_obj_properties src_properties
;
1401 rgw_rest_obj rest_obj
;
1403 rgw_sync_aws_multipart_upload_info status
;
1405 map
<string
, string
> new_attrs
;
1407 rgw_sync_aws_multipart_part_info
*pcur_part_info
{nullptr};
1411 rgw_raw_obj status_obj
;
1414 RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx
*_sc
,
1415 rgw_bucket_sync_pipe
& _sync_pipe
,
1416 AWSSyncConfig
& _conf
,
1417 RGWRESTConn
*_source_conn
,
1418 const rgw_obj
& _src_obj
,
1419 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
1420 const rgw_obj
& _dest_obj
,
1422 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1423 const rgw_rest_obj
& _rest_obj
) : RGWCoroutine(_sc
->cct
),
1427 source_conn(_source_conn
),
1430 dest_obj(_dest_obj
),
1431 obj_size(_obj_size
),
1432 src_properties(_src_properties
),
1433 rest_obj(_rest_obj
),
1434 status_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
,
1435 RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe
, sc
->source_zone
, src_obj
)) {
1439 int operate() override
{
1441 yield
call(new RGWSimpleRadosReadCR
<rgw_sync_aws_multipart_upload_info
>(sync_env
->async_rados
, sync_env
->svc
->sysobj
,
1442 status_obj
, &status
, false));
1444 if (retcode
< 0 && retcode
!= -ENOENT
) {
1445 ldout(sc
->cct
, 0) << "ERROR: failed to read sync status of object " << src_obj
<< " retcode=" << retcode
<< dendl
;
1450 /* check here that mtime and size did not change */
1452 if (status
.src_properties
.mtime
!= src_properties
.mtime
|| status
.obj_size
!= obj_size
||
1453 status
.src_properties
.etag
!= src_properties
.etag
) {
1454 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1459 if (retcode
== -ENOENT
) {
1460 RGWAWSStreamPutCRF::init_send_attrs(sc
->cct
, rest_obj
, src_properties
, target
.get(), &new_attrs
);
1462 yield
call(new RGWAWSInitMultipartCR(sc
, target
->conn
.get(), dest_obj
, status
.obj_size
, std::move(new_attrs
), &status
.upload_id
));
1464 return set_cr_error(retcode
);
1467 status
.obj_size
= obj_size
;
1468 status
.src_properties
= src_properties
;
1469 #define MULTIPART_MAX_PARTS 10000
1470 uint64_t min_part_size
= obj_size
/ MULTIPART_MAX_PARTS
;
1471 status
.part_size
= std::max(conf
.s3
.multipart_min_part_size
, min_part_size
);
1472 status
.num_parts
= (obj_size
+ status
.part_size
- 1) / status
.part_size
;
1473 status
.cur_part
= 1;
1476 for (; (uint32_t)status
.cur_part
<= status
.num_parts
; ++status
.cur_part
) {
1478 rgw_sync_aws_multipart_part_info
& cur_part_info
= status
.parts
[status
.cur_part
];
1479 cur_part_info
.part_num
= status
.cur_part
;
1480 cur_part_info
.ofs
= status
.cur_ofs
;
1481 cur_part_info
.size
= std::min((uint64_t)status
.part_size
, status
.obj_size
- status
.cur_ofs
);
1483 pcur_part_info
= &cur_part_info
;
1485 status
.cur_ofs
+= status
.part_size
;
1487 call(new RGWAWSStreamObjToCloudMultipartPartCR(sc
,
1488 source_conn
, src_obj
,
1491 status
.src_properties
,
1494 &cur_part_info
.etag
));
1498 ldout(sc
->cct
, 0) << "ERROR: failed to sync obj=" << src_obj
<< ", sync via multipart upload, upload_id=" << status
.upload_id
<< " part number " << status
.cur_part
<< " (error: " << cpp_strerror(-retcode
) << ")" << dendl
;
1500 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1501 return set_cr_error(ret_err
);
1504 yield
call(new RGWSimpleRadosWriteCR
<rgw_sync_aws_multipart_upload_info
>(sync_env
->async_rados
, sync_env
->svc
->sysobj
, status_obj
, status
));
1506 ldout(sc
->cct
, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode
<< dendl
;
1507 /* continue with upload anyway */
1509 ldout(sc
->cct
, 20) << "sync of object=" << src_obj
<< " via multipart upload, finished sending part #" << status
.cur_part
<< " etag=" << pcur_part_info
->etag
<< dendl
;
1512 yield
call(new RGWAWSCompleteMultipartCR(sc
, target
->conn
.get(), dest_obj
, status
.upload_id
, status
.parts
));
1514 ldout(sc
->cct
, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj
<< " (error: " << cpp_strerror(-retcode
) << ")" << dendl
;
1516 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1517 return set_cr_error(ret_err
);
1520 /* remove status obj */
1521 yield
call(new RGWRadosRemoveCR(sync_env
->store
, status_obj
));
1523 ldout(sc
->cct
, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj
<< " upload_id=" << status
.upload_id
<< " part number " << status
.cur_part
<< " (" << cpp_strerror(-retcode
) << ")" << dendl
;
1524 /* ignore error, best effort */
1526 return set_cr_done();
1533 int decode_attr(map
<string
, bufferlist
>& attrs
, const char *attr_name
, T
*result
, T def_val
)
1535 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
1536 if (iter
== attrs
.end()) {
1540 bufferlist
& bl
= iter
->second
;
1541 if (bl
.length() == 0) {
1545 auto bliter
= bl
.cbegin();
1547 decode(*result
, bliter
);
1548 } catch (buffer::error
& err
) {
1554 // maybe use Fetch Remote Obj instead?
1555 class RGWAWSHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
1556 rgw_bucket_sync_pipe sync_pipe
;
1557 AWSSyncInstanceEnv
& instance
;
1559 uint64_t versioned_epoch
{0};
1561 RGWRESTConn
*source_conn
{nullptr};
1562 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1564 unordered_map
<string
, bool> bucket_created
;
1565 string target_bucket_name
;
1566 string target_obj_name
;
1567 rgw_rest_obj rest_obj
;
1570 uint32_t src_zone_short_id
{0};
1571 uint64_t src_pg_ver
{0};
1575 struct CreateBucketResult
{
1578 void decode_xml(XMLObj
*obj
) {
1579 RGWXMLDecoder::decode_xml("Code", code
, obj
);
1584 RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1585 rgw_bucket_sync_pipe
& _sync_pipe
,
1587 AWSSyncInstanceEnv
& _instance
,
1588 uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1589 sync_pipe(_sync_pipe
),
1590 instance(_instance
), versioned_epoch(_versioned_epoch
)
1593 ~RGWAWSHandleRemoteObjCBCR(){
1596 int operate() override
{
1598 ret
= decode_attr(attrs
, RGW_ATTR_PG_VER
, &src_pg_ver
, (uint64_t)0);
1600 ldout(sc
->cct
, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl
;
1602 ret
= decode_attr(attrs
, RGW_ATTR_SOURCE_ZONE
, &src_zone_short_id
, (uint32_t)0);
1604 ldout(sc
->cct
, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl
;
1605 src_pg_ver
= 0; /* all or nothing */
1608 ldout(sc
->cct
, 4) << "AWS: download begin: z=" << sc
->source_zone
1609 << " b=" << src_bucket
<< " k=" << key
<< " size=" << size
1610 << " mtime=" << mtime
<< " etag=" << etag
1611 << " zone_short_id=" << src_zone_short_id
<< " pg_ver=" << src_pg_ver
1614 source_conn
= sync_env
->svc
->zone
->get_zone_conn(sc
->source_zone
);
1616 ldout(sc
->cct
, 0) << "ERROR: cannot find http connection to zone " << sc
->source_zone
<< dendl
;
1617 return set_cr_error(-EINVAL
);
1620 instance
.get_profile(sync_pipe
.info
.source_bs
.bucket
, &target
);
1621 instance
.conf
.get_target(target
, sync_pipe
.dest_bucket_info
, key
, &target_bucket_name
, &target_obj_name
);
1623 if (bucket_created
.find(target_bucket_name
) == bucket_created
.end()){
1625 ldout(sc
->cct
,0) << "AWS: creating bucket " << target_bucket_name
<< dendl
;
1627 call(new RGWPutRawRESTResourceCR
<bufferlist
> (sc
->cct
, target
->conn
.get(),
1628 sync_env
->http_manager
,
1629 target_bucket_name
, nullptr, bl
, &out_bl
));
1632 RGWXMLDecoder::XMLParser parser
;
1633 if (!parser
.init()) {
1634 ldout(sc
->cct
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1635 return set_cr_error(retcode
);
1638 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1639 string
str(out_bl
.c_str(), out_bl
.length());
1640 ldout(sc
->cct
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1641 return set_cr_error(retcode
);
1645 RGWXMLDecoder::decode_xml("Error", result
, &parser
, true);
1646 } catch (RGWXMLDecoder::err
& err
) {
1647 string
str(out_bl
.c_str(), out_bl
.length());
1648 ldout(sc
->cct
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1649 return set_cr_error(retcode
);
1652 if (result
.code
!= "BucketAlreadyOwnedByYou") {
1653 return set_cr_error(retcode
);
1657 bucket_created
[target_bucket_name
] = true;
1661 rgw_obj
src_obj(src_bucket
, key
);
1664 rgw_bucket target_bucket
;
1665 target_bucket
.name
= target_bucket_name
; /* this is only possible because we only use bucket name for
1667 rgw_obj
dest_obj(target_bucket
, target_obj_name
);
1670 rgw_sync_aws_src_obj_properties src_properties
;
1671 src_properties
.mtime
= mtime
;
1672 src_properties
.etag
= etag
;
1673 src_properties
.zone_short_id
= src_zone_short_id
;
1674 src_properties
.pg_ver
= src_pg_ver
;
1675 src_properties
.versioned_epoch
= versioned_epoch
;
1677 if (size
< instance
.conf
.s3
.multipart_sync_threshold
) {
1678 call(new RGWAWSStreamObjToCloudPlainCR(sc
, source_conn
, src_obj
,
1683 rgw_rest_obj rest_obj
;
1685 if (do_decode_rest_obj(sc
->cct
, attrs
, headers
, &rest_obj
)) {
1686 ldout(sc
->cct
, 0) << "ERROR: failed to decode rest obj out of headers=" << headers
<< ", attrs=" << attrs
<< dendl
;
1687 return set_cr_error(-EINVAL
);
1689 call(new RGWAWSStreamObjToCloudMultipartCR(sc
, sync_pipe
, instance
.conf
, source_conn
, src_obj
,
1690 target
, dest_obj
, size
, src_properties
, rest_obj
));
1694 return set_cr_error(retcode
);
1697 return set_cr_done();
1704 class RGWAWSHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
1705 rgw_bucket_sync_pipe sync_pipe
;
1706 AWSSyncInstanceEnv
& instance
;
1707 uint64_t versioned_epoch
;
1709 RGWAWSHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
1710 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1711 AWSSyncInstanceEnv
& _instance
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1712 sync_pipe(_sync_pipe
),
1713 instance(_instance
), versioned_epoch(_versioned_epoch
) {
1716 ~RGWAWSHandleRemoteObjCR() {}
1718 RGWStatRemoteObjCBCR
*allocate_callback() override
{
1719 return new RGWAWSHandleRemoteObjCBCR(sc
, sync_pipe
, key
, instance
, versioned_epoch
);
1723 class RGWAWSRemoveRemoteObjCBCR
: public RGWCoroutine
{
1725 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1726 rgw_bucket_sync_pipe sync_pipe
;
1728 ceph::real_time mtime
;
1729 AWSSyncInstanceEnv
& instance
;
1732 RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1733 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
1734 AWSSyncInstanceEnv
& _instance
) : RGWCoroutine(_sc
->cct
), sc(_sc
),
1735 sync_pipe(_sync_pipe
), key(_key
),
1736 mtime(_mtime
), instance(_instance
) {}
1737 int operate() override
{
1739 ldout(sc
->cct
, 0) << ": remove remote obj: z=" << sc
->source_zone
1740 << " b=" <<sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
1742 instance
.get_profile(sync_pipe
.info
.source_bs
.bucket
, &target
);
1743 string path
= instance
.conf
.get_path(target
, sync_pipe
.dest_bucket_info
, key
);
1744 ldout(sc
->cct
, 0) << "AWS: removing aws object at" << path
<< dendl
;
1746 call(new RGWDeleteRESTResourceCR(sc
->cct
, target
->conn
.get(),
1747 sc
->env
->http_manager
,
1748 path
, nullptr /* params */));
1751 return set_cr_error(retcode
);
1753 return set_cr_done();
1761 class RGWAWSDataSyncModule
: public RGWDataSyncModule
{
1763 AWSSyncInstanceEnv instance
;
1765 RGWAWSDataSyncModule(CephContext
*_cct
, AWSSyncConfig
& _conf
) :
1770 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
1771 instance
.init(sc
, instance_id
);
1774 ~RGWAWSDataSyncModule() {}
1776 RGWCoroutine
*sync_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
1777 std::optional
<uint64_t> versioned_epoch
,
1778 rgw_zone_set
*zones_trace
) override
{
1779 ldout(sc
->cct
, 0) << instance
.id
<< ": sync_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1780 return new RGWAWSHandleRemoteObjCR(sc
, sync_pipe
, key
, instance
, versioned_epoch
.value_or(0));
1782 RGWCoroutine
*remove_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
,
1783 rgw_zone_set
*zones_trace
) override
{
1784 ldout(sc
->cct
, 0) <<"rm_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1785 return new RGWAWSRemoveRemoteObjCBCR(sc
, sync_pipe
, key
, mtime
, instance
);
1787 RGWCoroutine
*create_delete_marker(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
1788 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
,
1789 rgw_zone_set
*zones_trace
) override
{
1790 ldout(sc
->cct
, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
1791 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1796 class RGWAWSSyncModuleInstance
: public RGWSyncModuleInstance
{
1797 RGWAWSDataSyncModule data_handler
;
1799 RGWAWSSyncModuleInstance(CephContext
*cct
, AWSSyncConfig
& _conf
) : data_handler(cct
, _conf
) {}
1800 RGWDataSyncModule
*get_data_handler() override
{
1801 return &data_handler
;
1805 int RGWAWSSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
){
1808 int r
= conf
.init(cct
, config
);
1813 instance
->reset(new RGWAWSSyncModuleInstance(cct
, conf
));