1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/errno.h"
6 #include "rgw_common.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_sync_module.h"
9 #include "rgw_data_sync.h"
10 #include "rgw_sync_module_aws.h"
11 #include "rgw_cr_rados.h"
12 #include "rgw_rest_conn.h"
13 #include "rgw_cr_rest.h"
17 #include "services/svc_zone.h"
19 #include <boost/asio/yield.hpp>
21 #define dout_subsys ceph_subsys_rgw
24 #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
28 static string default_target_path
= "rgw-${zonegroup}-${sid}/${bucket}";
30 static string
get_key_oid(const rgw_obj_key
& key
)
32 string oid
= key
.name
;
33 if (!key
.instance
.empty() &&
34 !key
.have_null_instance()) {
35 oid
+= string(":") + key
.instance
;
40 static string
obj_to_aws_path(const rgw_obj
& obj
)
42 return obj
.bucket
.name
+ "/" + get_key_oid(obj
.key
);
47 json configuration definition:
51 "access_key": <access>,
53 "endpoint": <endpoint>,
54 "host_style": <path | virtual>,
56 "acls": [ { "type": <id | email | uri>,
57 "source_id": <source_id>,
58 "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
59 "target_path": <target_path>, # override default
62 # anything below here is for non trivial configuration
63 # can be used in conjuction with the above
67 "access_key": <access>,
69 "endpoint": <endpoint>,
70 "host_style" <path | virtual>,
72 "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
74 "type" : <id | email | uri>, # optional, default is id
78 "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
79 # final object name will be target_path + "/" + obj
84 "access_key": <access>,
86 "endpoint": <endpoint>,
90 "id": <id>, # acl mappings
92 "type": <id | email | uri>,
100 "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
101 "target_path": <dest>, # (override default)
102 "connection_id": <connection_id>, # optional, if empty references default connection
103 "acls_id": <mappings_id>, # optional, if empty references default mappings
107 target path optional variables:
110 sid: sync instance id, randomly generated by sync process on first sync initalization
111 zonegroup: zonegroup name
112 zonegroup_id: zonegroup name
116 (evaluated when syncing)
123 ACLGranteeTypeEnum type
{ACL_TYPE_CANON_USER
};
127 ACLMapping() = default;
129 ACLMapping(ACLGranteeTypeEnum t
,
131 const string
& d
) : type(t
),
135 void init(const JSONFormattable
& config
) {
136 const string
& t
= config
["type"];
139 type
= ACL_TYPE_EMAIL_USER
;
140 } else if (t
== "uri") {
141 type
= ACL_TYPE_GROUP
;
143 type
= ACL_TYPE_CANON_USER
;
146 source_id
= config
["source_id"];
147 dest_id
= config
["dest_id"];
150 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
151 Formatter::ObjectSection
os(jf
, "acl_mapping");
154 case ACL_TYPE_EMAIL_USER
:
164 encode_json("type", s
, &jf
);
165 encode_json("source_id", source_id
, &jf
);
166 encode_json("dest_id", dest_id
, &jf
);
171 map
<string
, ACLMapping
> acl_mappings
;
173 void init(const JSONFormattable
& config
) {
174 for (auto& c
: config
.array()) {
178 acl_mappings
.emplace(std::make_pair(m
.source_id
, m
));
181 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
182 Formatter::ArraySection
os(jf
, "acls");
184 for (auto& i
: acl_mappings
) {
185 i
.second
.dump_conf(cct
, jf
);
190 struct AWSSyncConfig_ACLProfiles
{
191 map
<string
, std::shared_ptr
<ACLMappings
> > acl_profiles
;
193 void init(const JSONFormattable
& config
) {
194 for (auto& c
: config
.array()) {
195 const string
& profile_id
= c
["id"];
197 std::shared_ptr
<ACLMappings
> ap
{new ACLMappings
};
200 acl_profiles
[profile_id
] = ap
;
203 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
204 Formatter::ArraySection
section(jf
, "acl_profiles");
206 for (auto& p
: acl_profiles
) {
207 Formatter::ObjectSection
section(jf
, "profile");
208 encode_json("id", p
.first
, &jf
);
209 p
.second
->dump_conf(cct
, jf
);
213 bool find(const string
& profile_id
, ACLMappings
*result
) const {
214 auto iter
= acl_profiles
.find(profile_id
);
215 if (iter
== acl_profiles
.end()) {
218 *result
= *iter
->second
;
223 struct AWSSyncConfig_Connection
{
224 string connection_id
;
227 std::optional
<string
> region
;
228 HostStyle host_style
{PathStyle
};
230 bool has_endpoint
{false};
232 bool has_host_style
{false};
234 void init(const JSONFormattable
& config
) {
235 has_endpoint
= config
.exists("endpoint");
236 has_key
= config
.exists("access_key") || config
.exists("secret");
237 has_host_style
= config
.exists("host_style");
239 connection_id
= config
["id"];
240 endpoint
= config
["endpoint"];
242 key
= RGWAccessKey(config
["access_key"], config
["secret"]);
244 if (config
.exists("region")) {
245 region
= config
["region"];
250 string host_style_str
= config
["host_style"];
251 if (host_style_str
!= "virtual") {
252 host_style
= PathStyle
;
254 host_style
= VirtualStyle
;
257 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
258 Formatter::ObjectSection
section(jf
, "connection");
259 encode_json("id", connection_id
, &jf
);
260 encode_json("endpoint", endpoint
, &jf
);
261 string s
= (host_style
== PathStyle
? "path" : "virtual");
262 encode_json("region", region
, &jf
);
263 encode_json("host_style", s
, &jf
);
266 Formatter::ObjectSection
os(jf
, "key");
267 encode_json("access_key", key
.id
, &jf
);
268 string secret
= (key
.key
.empty() ? "" : "******");
269 encode_json("secret", secret
, &jf
);
274 static int conf_to_uint64(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, const string
& key
, uint64_t *pval
)
277 if (config
.find(key
, &sval
)) {
279 uint64_t val
= strict_strtoll(sval
.c_str(), 10, &err
);
281 ldpp_dout(dpp
, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key
<< ": " << sval
<< dendl
;
289 struct AWSSyncConfig_S3
{
290 uint64_t multipart_sync_threshold
{DEFAULT_MULTIPART_SYNC_PART_SIZE
};
291 uint64_t multipart_min_part_size
{DEFAULT_MULTIPART_SYNC_PART_SIZE
};
293 int init(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
) {
294 int r
= conf_to_uint64(dpp
, cct
, config
, "multipart_sync_threshold", &multipart_sync_threshold
);
299 r
= conf_to_uint64(dpp
, cct
, config
, "multipart_min_part_size", &multipart_min_part_size
);
303 #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
304 if (multipart_min_part_size
< MULTIPART_MIN_POSSIBLE_PART_SIZE
) {
305 multipart_min_part_size
= MULTIPART_MIN_POSSIBLE_PART_SIZE
;
310 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
311 Formatter::ObjectSection
section(jf
, "s3");
312 encode_json("multipart_sync_threshold", multipart_sync_threshold
, &jf
);
313 encode_json("multipart_min_part_size", multipart_min_part_size
, &jf
);
317 struct AWSSyncConfig_Profile
{
318 string source_bucket
;
321 string connection_id
;
324 std::shared_ptr
<AWSSyncConfig_Connection
> conn_conf
;
325 std::shared_ptr
<ACLMappings
> acls
;
327 std::shared_ptr
<RGWRESTConn
> conn
;
329 void init(const JSONFormattable
& config
) {
330 source_bucket
= config
["source_bucket"];
332 prefix
= (!source_bucket
.empty() && source_bucket
[source_bucket
.size() - 1] == '*');
335 source_bucket
= source_bucket
.substr(0, source_bucket
.size() - 1);
338 target_path
= config
["target_path"];
339 connection_id
= config
["connection_id"];
340 acls_id
= config
["acls_id"];
342 if (config
.exists("connection")) {
343 conn_conf
= make_shared
<AWSSyncConfig_Connection
>();
344 conn_conf
->init(config
["connection"]);
347 if (config
.exists("acls")) {
348 acls
= make_shared
<ACLMappings
>();
349 acls
->init(config
["acls"]);
353 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
, const char *section
= "config") const {
354 Formatter::ObjectSection
config(jf
, section
);
355 string sb
{source_bucket
};
359 encode_json("source_bucket", sb
, &jf
);
360 encode_json("target_path", target_path
, &jf
);
361 encode_json("connection_id", connection_id
, &jf
);
362 encode_json("acls_id", acls_id
, &jf
);
363 if (conn_conf
.get()) {
364 conn_conf
->dump_conf(cct
, jf
);
367 acls
->dump_conf(cct
, jf
);
372 static void find_and_replace(const string
& src
, const string
& find
, const string
& replace
, string
*dest
)
376 size_t pos
= s
.find(find
);
377 while (pos
!= string::npos
) {
378 size_t next_ofs
= pos
+ find
.size();
379 s
= s
.substr(0, pos
) + replace
+ s
.substr(next_ofs
);
380 pos
= s
.find(find
, next_ofs
);
386 static void apply_meta_param(const string
& src
, const string
& param
, const string
& val
, string
*dest
)
388 string s
= string("${") + param
+ "}";
389 find_and_replace(src
, s
, val
, dest
);
393 struct AWSSyncConfig
{
394 AWSSyncConfig_Profile default_profile
;
395 std::shared_ptr
<AWSSyncConfig_Profile
> root_profile
;
397 map
<string
, std::shared_ptr
<AWSSyncConfig_Connection
> > connections
;
398 AWSSyncConfig_ACLProfiles acl_profiles
;
400 map
<string
, std::shared_ptr
<AWSSyncConfig_Profile
> > explicit_profiles
;
404 int init_profile(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& profile_conf
, AWSSyncConfig_Profile
& profile
,
405 bool connection_must_exist
) {
406 if (!profile
.connection_id
.empty()) {
407 if (profile
.conn_conf
) {
408 ldpp_dout(dpp
, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile
.connection_id
<< dendl
;
411 if (connections
.find(profile
.connection_id
) == connections
.end()) {
412 ldpp_dout(dpp
, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile
.connection_id
<< dendl
;
415 profile
.conn_conf
= connections
[profile
.connection_id
];
416 } else if (!profile
.conn_conf
) {
417 profile
.connection_id
= default_profile
.connection_id
;
418 auto i
= connections
.find(profile
.connection_id
);
419 if (i
!= connections
.end()) {
420 profile
.conn_conf
= i
->second
;
424 if (connection_must_exist
&& !profile
.conn_conf
) {
425 ldpp_dout(dpp
, 0) << "ERROR: remote connection undefined for sync profile" << dendl
;
429 if (profile
.conn_conf
&& default_profile
.conn_conf
) {
430 if (!profile
.conn_conf
->has_endpoint
) {
431 profile
.conn_conf
->endpoint
= default_profile
.conn_conf
->endpoint
;
433 if (!profile
.conn_conf
->has_host_style
) {
434 profile
.conn_conf
->host_style
= default_profile
.conn_conf
->host_style
;
436 if (!profile
.conn_conf
->has_key
) {
437 profile
.conn_conf
->key
= default_profile
.conn_conf
->key
;
441 ACLMappings acl_mappings
;
443 if (!profile
.acls_id
.empty()) {
444 if (!acl_profiles
.find(profile
.acls_id
, &acl_mappings
)) {
445 ldpp_dout(dpp
, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile
.acls_id
<< dendl
;
448 profile
.acls
= acl_profiles
.acl_profiles
[profile
.acls_id
];
449 } else if (!profile
.acls
) {
450 if (default_profile
.acls
) {
451 profile
.acls
= default_profile
.acls
;
452 profile
.acls_id
= default_profile
.acls_id
;
456 if (profile
.target_path
.empty()) {
457 profile
.target_path
= default_profile
.target_path
;
459 if (profile
.target_path
.empty()) {
460 profile
.target_path
= default_target_path
;
466 int init_target(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& profile_conf
, std::shared_ptr
<AWSSyncConfig_Profile
> *ptarget
) {
467 std::shared_ptr
<AWSSyncConfig_Profile
> profile
;
468 profile
.reset(new AWSSyncConfig_Profile
);
469 profile
->init(profile_conf
);
471 int ret
= init_profile(dpp
, cct
, profile_conf
, *profile
, true);
476 auto& sb
= profile
->source_bucket
;
478 if (explicit_profiles
.find(sb
) != explicit_profiles
.end()) {
479 ldpp_dout(dpp
, 0) << "WARNING: duplicate target configuration in sync module" << dendl
;
482 explicit_profiles
[sb
] = profile
;
489 bool do_find_profile(const rgw_bucket bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *result
) {
490 const string
& name
= bucket
.name
;
491 auto iter
= explicit_profiles
.upper_bound(name
);
492 if (iter
== explicit_profiles
.begin()) {
497 if (iter
->first
.size() > name
.size()) {
500 if (name
.compare(0, iter
->first
.size(), iter
->first
) != 0) {
504 std::shared_ptr
<AWSSyncConfig_Profile
>& target
= iter
->second
;
506 if (!target
->prefix
&&
507 name
.size() != iter
->first
.size()) {
515 void find_profile(const rgw_bucket bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *result
) {
516 if (!do_find_profile(bucket
, result
)) {
517 *result
= root_profile
;
523 int init(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
) {
524 auto& default_conf
= config
["default"];
526 if (config
.exists("default")) {
527 default_profile
.init(default_conf
);
528 init_profile(dpp
, cct
, default_conf
, default_profile
, false);
531 for (auto& conn
: config
["connections"].array()) {
532 auto new_conn
= conn
;
534 std::shared_ptr
<AWSSyncConfig_Connection
> c
{new AWSSyncConfig_Connection
};
537 connections
[new_conn
["id"]] = c
;
540 acl_profiles
.init(config
["acl_profiles"]);
542 int r
= s3
.init(dpp
, cct
, config
["s3"]);
547 auto new_root_conf
= config
;
549 r
= init_target(dpp
, cct
, new_root_conf
, &root_profile
); /* the root profile config */
554 for (auto target_conf
: config
["profiles"].array()) {
555 int r
= init_target(dpp
, cct
, target_conf
, nullptr);
561 JSONFormatter
jf(true);
566 ldpp_dout(dpp
, 5) << "sync module config (parsed representation):\n" << ss
.str() << dendl
;
571 void expand_target(RGWDataSyncCtx
*sc
, const string
& sid
, const string
& path
, string
*dest
) {
572 apply_meta_param(path
, "sid", sid
, dest
);
574 const RGWZoneGroup
& zg
= sc
->env
->svc
->zone
->get_zonegroup();
575 apply_meta_param(path
, "zonegroup", zg
.get_name(), dest
);
576 apply_meta_param(path
, "zonegroup_id", zg
.get_id(), dest
);
578 const RGWZone
& zone
= sc
->env
->svc
->zone
->get_zone();
579 apply_meta_param(path
, "zone", zone
.name
, dest
);
580 apply_meta_param(path
, "zone_id", zone
.id
, dest
);
583 void update_config(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, const string
& sid
) {
584 expand_target(sc
, sid
, root_profile
->target_path
, &root_profile
->target_path
);
585 ldpp_dout(dpp
, 20) << "updated target: (root) -> " << root_profile
->target_path
<< dendl
;
586 for (auto& t
: explicit_profiles
) {
587 expand_target(sc
, sid
, t
.second
->target_path
, &t
.second
->target_path
);
588 ldpp_dout(dpp
, 20) << "updated target: " << t
.first
<< " -> " << t
.second
->target_path
<< dendl
;
592 void dump_conf(CephContext
*cct
, JSONFormatter
& jf
) const {
593 Formatter::ObjectSection
config(jf
, "config");
594 root_profile
->dump_conf(cct
, jf
);
595 jf
.open_array_section("connections");
596 for (auto c
: connections
) {
597 c
.second
->dump_conf(cct
, jf
);
601 acl_profiles
.dump_conf(cct
, jf
);
604 Formatter::ArraySection
as(jf
, "profiles");
605 for (auto& t
: explicit_profiles
) {
606 Formatter::ObjectSection
target_section(jf
, "profile");
607 encode_json("name", t
.first
, &jf
);
608 t
.second
->dump_conf(cct
, jf
);
613 string
get_path(std::shared_ptr
<AWSSyncConfig_Profile
>& profile
,
614 const RGWBucketInfo
& bucket_info
,
615 const rgw_obj_key
& obj
) {
618 if (!bucket_info
.owner
.tenant
.empty()) {
619 bucket_str
= owner
= bucket_info
.owner
.tenant
+ "-";
620 owner
+= bucket_info
.owner
.id
;
622 bucket_str
+= bucket_info
.bucket
.name
;
624 const string
& path
= profile
->target_path
;
627 apply_meta_param(path
, "bucket", bucket_str
, &new_path
);
628 apply_meta_param(new_path
, "owner", owner
, &new_path
);
630 new_path
+= string("/") + get_key_oid(obj
);
635 void get_target(std::shared_ptr
<AWSSyncConfig_Profile
>& profile
,
636 const RGWBucketInfo
& bucket_info
,
637 const rgw_obj_key
& obj
,
640 string path
= get_path(profile
, bucket_info
, obj
);
641 size_t pos
= path
.find('/');
643 *bucket_name
= path
.substr(0, pos
);
644 *obj_name
= path
.substr(pos
+ 1);
647 void init_conns(RGWDataSyncCtx
*sc
, const string
& id
) {
648 auto sync_env
= sc
->env
;
650 update_config(sync_env
->dpp
, sc
, id
);
652 auto& root_conf
= root_profile
->conn_conf
;
654 root_profile
->conn
.reset(new S3RESTConn(sc
->cct
,
656 { root_conf
->endpoint
},
658 sync_env
->svc
->zone
->get_zonegroup().get_id(),
660 root_conf
->host_style
));
662 for (auto i
: explicit_profiles
) {
665 c
->conn
.reset(new S3RESTConn(sc
->cct
,
667 { c
->conn_conf
->endpoint
},
669 sync_env
->svc
->zone
->get_zonegroup().get_id(),
670 c
->conn_conf
->region
,
671 c
->conn_conf
->host_style
));
677 struct AWSSyncInstanceEnv
{
681 explicit AWSSyncInstanceEnv(AWSSyncConfig
& _conf
) : conf(_conf
) {}
683 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) {
685 snprintf(buf
, sizeof(buf
), "%llx", (unsigned long long)instance_id
);
688 conf
.init_conns(sc
, id
);
691 void get_profile(const rgw_bucket
& bucket
, std::shared_ptr
<AWSSyncConfig_Profile
> *ptarget
) {
692 conf
.find_profile(bucket
, ptarget
);
693 ceph_assert(ptarget
);
697 static int do_decode_rest_obj(const DoutPrefixProvider
*dpp
, CephContext
*cct
, map
<string
, bufferlist
>& attrs
, map
<string
, string
>& headers
, rgw_rest_obj
*info
)
699 for (auto header
: headers
) {
700 const string
& val
= header
.second
;
701 if (header
.first
== "RGWX_OBJECT_SIZE") {
702 info
->content_len
= atoi(val
.c_str());
704 info
->attrs
[header
.first
] = val
;
708 info
->acls
.set_ctx(cct
);
709 auto aiter
= attrs
.find(RGW_ATTR_ACL
);
710 if (aiter
!= attrs
.end()) {
711 bufferlist
& bl
= aiter
->second
;
712 auto bliter
= bl
.cbegin();
714 info
->acls
.decode(bliter
);
715 } catch (buffer::error
& err
) {
716 ldpp_dout(dpp
, 0) << "ERROR: failed to decode policy off attrs" << dendl
;
720 ldpp_dout(dpp
, 0) << "WARNING: acl attrs not provided" << dendl
;
726 class RGWRESTStreamGetCRF
: public RGWStreamReadHTTPResourceCRF
730 const rgw_obj
& src_obj
;
731 RGWRESTConn::get_obj_params req_params
;
733 rgw_sync_aws_src_obj_properties src_properties
;
735 RGWRESTStreamGetCRF(CephContext
*_cct
,
736 RGWCoroutinesEnv
*_env
,
737 RGWCoroutine
*_caller
,
740 const rgw_obj
& _src_obj
,
741 const rgw_sync_aws_src_obj_properties
& _src_properties
) : RGWStreamReadHTTPResourceCRF(_cct
, _env
, _caller
,
742 _sc
->env
->http_manager
, _src_obj
.key
),
743 sc(_sc
), conn(_conn
), src_obj(_src_obj
),
744 src_properties(_src_properties
) {
747 int init(const DoutPrefixProvider
*dpp
) override
{
748 /* init input connection */
751 req_params
.get_op
= true;
752 req_params
.prepend_metadata
= true;
754 req_params
.unmod_ptr
= &src_properties
.mtime
;
755 req_params
.etag
= src_properties
.etag
;
756 req_params
.mod_zone_id
= src_properties
.zone_short_id
;
757 req_params
.mod_pg_ver
= src_properties
.pg_ver
;
760 req_params
.range_is_set
= true;
761 req_params
.range_start
= range
.ofs
;
762 req_params
.range_end
= range
.ofs
+ range
.size
- 1;
765 RGWRESTStreamRWRequest
*in_req
;
766 int ret
= conn
->get_obj(dpp
, src_obj
, req_params
, false /* send */, &in_req
);
768 ldpp_dout(dpp
, 0) << "ERROR: " << __func__
<< "(): conn->get_obj() returned ret=" << ret
<< dendl
;
774 return RGWStreamReadHTTPResourceCRF::init(dpp
);
777 int decode_rest_obj(const DoutPrefixProvider
*dpp
, map
<string
, string
>& headers
, bufferlist
& extra_data
) override
{
778 map
<string
, bufferlist
> src_attrs
;
780 ldpp_dout(dpp
, 20) << __func__
<< ":" << " headers=" << headers
<< " extra_data.length()=" << extra_data
.length() << dendl
;
782 if (extra_data
.length() > 0) {
784 if (!jp
.parse(extra_data
.c_str(), extra_data
.length())) {
785 ldpp_dout(dpp
, 0) << "ERROR: failed to parse response extra data. len=" << extra_data
.length() << " data=" << extra_data
.c_str() << dendl
;
789 JSONDecoder::decode_json("attrs", src_attrs
, &jp
);
791 return do_decode_rest_obj(dpp
, sc
->cct
, src_attrs
, headers
, &rest_obj
);
794 bool need_extra_data() override
{
799 static std::set
<string
> keep_headers
= { "CONTENT_TYPE",
801 "CONTENT_DISPOSITION",
802 "CONTENT_LANGUAGE" };
804 class RGWAWSStreamPutCRF
: public RGWStreamWriteHTTPResourceCRF
807 rgw_sync_aws_src_obj_properties src_properties
;
808 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
809 const rgw_obj
& dest_obj
;
812 RGWAWSStreamPutCRF(CephContext
*_cct
,
813 RGWCoroutinesEnv
*_env
,
814 RGWCoroutine
*_caller
,
816 const rgw_sync_aws_src_obj_properties
& _src_properties
,
817 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
818 const rgw_obj
& _dest_obj
) : RGWStreamWriteHTTPResourceCRF(_cct
, _env
, _caller
, _sc
->env
->http_manager
),
819 sc(_sc
), src_properties(_src_properties
), target(_target
), dest_obj(_dest_obj
) {
822 int init() override
{
823 /* init output connection */
824 RGWRESTStreamS3PutObj
*out_req
{nullptr};
826 if (multipart
.is_multipart
) {
828 snprintf(buf
, sizeof(buf
), "%d", multipart
.part_num
);
829 rgw_http_param_pair params
[] = { { "uploadId", multipart
.upload_id
.c_str() },
830 { "partNumber", buf
},
831 { nullptr, nullptr } };
832 target
->conn
->put_obj_send_init(dest_obj
, params
, &out_req
);
834 target
->conn
->put_obj_send_init(dest_obj
, nullptr, &out_req
);
839 return RGWStreamWriteHTTPResourceCRF::init();
842 static bool keep_attr(const string
& h
) {
843 return (keep_headers
.find(h
) != keep_headers
.end() ||
844 boost::algorithm::starts_with(h
, "X_AMZ_"));
847 static void init_send_attrs(const DoutPrefixProvider
*dpp
,
849 const rgw_rest_obj
& rest_obj
,
850 const rgw_sync_aws_src_obj_properties
& src_properties
,
851 const AWSSyncConfig_Profile
*target
,
852 map
<string
, string
> *attrs
) {
853 auto& new_attrs
= *attrs
;
857 for (auto& hi
: rest_obj
.attrs
) {
858 if (keep_attr(hi
.first
)) {
859 new_attrs
.insert(hi
);
863 auto acl
= rest_obj
.acls
.get_acl();
865 map
<int, vector
<string
> > access_map
;
868 for (auto& grant
: acl
.get_grant_map()) {
869 auto& orig_grantee
= grant
.first
;
870 auto& perm
= grant
.second
;
874 const auto& am
= target
->acls
->acl_mappings
;
876 auto iter
= am
.find(orig_grantee
);
877 if (iter
== am
.end()) {
878 ldpp_dout(dpp
, 20) << "acl_mappings: Could not find " << orig_grantee
<< " .. ignoring" << dendl
;
882 grantee
= iter
->second
.dest_id
;
886 switch (iter
->second
.type
) {
887 case ACL_TYPE_CANON_USER
:
890 case ACL_TYPE_EMAIL_USER
:
891 type
= "emailAddress";
900 string tv
= type
+ "=" + grantee
;
902 int flags
= perm
.get_permission().get_permissions();
903 if ((flags
& RGW_PERM_FULL_CONTROL
) == RGW_PERM_FULL_CONTROL
) {
904 access_map
[flags
].push_back(tv
);
908 for (int i
= 1; i
<= RGW_PERM_WRITE_ACP
; i
<<= 1) {
910 access_map
[i
].push_back(tv
);
916 for (auto aiter
: access_map
) {
917 int grant_type
= aiter
.first
;
919 string
header_str("x-amz-grant-");
921 switch (grant_type
) {
923 header_str
.append("read");
926 header_str
.append("write");
928 case RGW_PERM_READ_ACP
:
929 header_str
.append("read-acp");
931 case RGW_PERM_WRITE_ACP
:
932 header_str
.append("write-acp");
934 case RGW_PERM_FULL_CONTROL
:
935 header_str
.append("full-control");
941 for (auto viter
: aiter
.second
) {
948 ldpp_dout(dpp
, 20) << "acl_mappings: set acl: " << header_str
<< "=" << s
<< dendl
;
950 new_attrs
[header_str
] = s
;
954 snprintf(buf
, sizeof(buf
), "%llu", (long long)src_properties
.versioned_epoch
);
955 new_attrs
["x-amz-meta-rgwx-versioned-epoch"] = buf
;
957 utime_t
ut(src_properties
.mtime
);
958 snprintf(buf
, sizeof(buf
), "%lld.%09lld",
960 (long long)ut
.nsec());
962 new_attrs
["x-amz-meta-rgwx-source-mtime"] = buf
;
963 new_attrs
["x-amz-meta-rgwx-source-etag"] = src_properties
.etag
;
964 new_attrs
["x-amz-meta-rgwx-source-key"] = rest_obj
.key
.name
;
965 if (!rest_obj
.key
.instance
.empty()) {
966 new_attrs
["x-amz-meta-rgwx-source-version-id"] = rest_obj
.key
.instance
;
970 void send_ready(const DoutPrefixProvider
*dpp
, const rgw_rest_obj
& rest_obj
) override
{
971 RGWRESTStreamS3PutObj
*r
= static_cast<RGWRESTStreamS3PutObj
*>(req
);
973 map
<string
, string
> new_attrs
;
974 if (!multipart
.is_multipart
) {
975 init_send_attrs(dpp
, sc
->cct
, rest_obj
, src_properties
, target
.get(), &new_attrs
);
978 r
->set_send_length(rest_obj
.content_len
);
980 RGWAccessControlPolicy policy
;
982 r
->send_ready(dpp
, target
->conn
->get_key(), new_attrs
, policy
);
985 void handle_headers(const map
<string
, string
>& headers
) {
986 for (auto h
: headers
) {
987 if (h
.first
== "ETAG") {
993 bool get_etag(string
*petag
) {
1003 class RGWAWSStreamObjToCloudPlainCR
: public RGWCoroutine
{
1005 RGWRESTConn
*source_conn
;
1006 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1007 const rgw_obj
& src_obj
;
1008 const rgw_obj
& dest_obj
;
1010 rgw_sync_aws_src_obj_properties src_properties
;
1012 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
1013 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
1016 RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx
*_sc
,
1017 RGWRESTConn
*_source_conn
,
1018 const rgw_obj
& _src_obj
,
1019 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1020 std::shared_ptr
<AWSSyncConfig_Profile
> _target
,
1021 const rgw_obj
& _dest_obj
) : RGWCoroutine(_sc
->cct
),
1023 source_conn(_source_conn
),
1026 dest_obj(_dest_obj
),
1027 src_properties(_src_properties
) {}
1029 int operate(const DoutPrefixProvider
*dpp
) override
{
1032 in_crf
.reset(new RGWRESTStreamGetCRF(cct
, get_env(), this, sc
,
1033 source_conn
, src_obj
,
1037 out_crf
.reset(new RGWAWSStreamPutCRF(cct
, get_env(), this, sc
,
1038 src_properties
, target
, dest_obj
));
1040 yield
call(new RGWStreamSpliceCR(cct
, sc
->env
->http_manager
, in_crf
, out_crf
));
1042 return set_cr_error(retcode
);
1045 return set_cr_done();
1052 class RGWAWSStreamObjToCloudMultipartPartCR
: public RGWCoroutine
{
1054 RGWRESTConn
*source_conn
;
1055 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1056 const rgw_obj
& src_obj
;
1057 const rgw_obj
& dest_obj
;
1059 rgw_sync_aws_src_obj_properties src_properties
;
1063 rgw_sync_aws_multipart_part_info part_info
;
1065 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
1066 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
1071 RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx
*_sc
,
1072 RGWRESTConn
*_source_conn
,
1073 const rgw_obj
& _src_obj
,
1074 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
1075 const rgw_obj
& _dest_obj
,
1076 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1077 const string
& _upload_id
,
1078 const rgw_sync_aws_multipart_part_info
& _part_info
,
1079 string
*_petag
) : RGWCoroutine(_sc
->cct
),
1081 source_conn(_source_conn
),
1084 dest_obj(_dest_obj
),
1085 src_properties(_src_properties
),
1086 upload_id(_upload_id
),
1087 part_info(_part_info
),
1090 int operate(const DoutPrefixProvider
*dpp
) override
{
1093 in_crf
.reset(new RGWRESTStreamGetCRF(cct
, get_env(), this, sc
,
1094 source_conn
, src_obj
,
1097 in_crf
->set_range(part_info
.ofs
, part_info
.size
);
1100 out_crf
.reset(new RGWAWSStreamPutCRF(cct
, get_env(), this, sc
,
1101 src_properties
, target
, dest_obj
));
1103 out_crf
->set_multipart(upload_id
, part_info
.part_num
, part_info
.size
);
1105 yield
call(new RGWStreamSpliceCR(cct
, sc
->env
->http_manager
, in_crf
, out_crf
));
1107 return set_cr_error(retcode
);
1110 if (!(static_cast<RGWAWSStreamPutCRF
*>(out_crf
.get()))->get_etag(petag
)) {
1111 ldpp_dout(dpp
, 0) << "ERROR: failed to get etag from PUT request" << dendl
;
1112 return set_cr_error(-EIO
);
1115 return set_cr_done();
1122 class RGWAWSAbortMultipartCR
: public RGWCoroutine
{
1124 RGWRESTConn
*dest_conn
;
1125 const rgw_obj
& dest_obj
;
1130 RGWAWSAbortMultipartCR(RGWDataSyncCtx
*_sc
,
1131 RGWRESTConn
*_dest_conn
,
1132 const rgw_obj
& _dest_obj
,
1133 const string
& _upload_id
) : RGWCoroutine(_sc
->cct
),
1135 dest_conn(_dest_conn
),
1136 dest_obj(_dest_obj
),
1137 upload_id(_upload_id
) {}
1139 int operate(const DoutPrefixProvider
*dpp
) override
{
1143 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
1145 call(new RGWDeleteRESTResourceCR(sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1146 obj_to_aws_path(dest_obj
), params
));
1150 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj
<< " (retcode=" << retcode
<< ")" << dendl
;
1151 return set_cr_error(retcode
);
1154 return set_cr_done();
1161 class RGWAWSInitMultipartCR
: public RGWCoroutine
{
1163 RGWRESTConn
*dest_conn
;
1164 const rgw_obj
& dest_obj
;
1167 map
<string
, string
> attrs
;
1173 struct InitMultipartResult
{
1178 void decode_xml(XMLObj
*obj
) {
1179 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
1180 RGWXMLDecoder::decode_xml("Key", key
, obj
);
1181 RGWXMLDecoder::decode_xml("UploadId", upload_id
, obj
);
1186 RGWAWSInitMultipartCR(RGWDataSyncCtx
*_sc
,
1187 RGWRESTConn
*_dest_conn
,
1188 const rgw_obj
& _dest_obj
,
1190 const map
<string
, string
>& _attrs
,
1191 string
*_upload_id
) : RGWCoroutine(_sc
->cct
),
1193 dest_conn(_dest_conn
),
1194 dest_obj(_dest_obj
),
1195 obj_size(_obj_size
),
1197 upload_id(_upload_id
) {}
1199 int operate(const DoutPrefixProvider
*dpp
) override
{
1203 rgw_http_param_pair params
[] = { { "uploads", nullptr }, {nullptr, nullptr} };
1205 call(new RGWPostRawRESTResourceCR
<bufferlist
> (sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1206 obj_to_aws_path(dest_obj
), params
, &attrs
, bl
, &out_bl
));
1210 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
1211 return set_cr_error(retcode
);
1215 * If one of the following fails we cannot abort upload, as we cannot
1216 * extract the upload id. If one of these fail it's very likely that that's
1217 * the least of our problem.
1219 RGWXMLDecoder::XMLParser parser
;
1220 if (!parser
.init()) {
1221 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1222 return set_cr_error(-EIO
);
1225 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1226 string
str(out_bl
.c_str(), out_bl
.length());
1227 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1228 return set_cr_error(-EIO
);
1232 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result
, &parser
, true);
1233 } catch (RGWXMLDecoder::err
& err
) {
1234 string
str(out_bl
.c_str(), out_bl
.length());
1235 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1236 return set_cr_error(-EIO
);
1240 ldpp_dout(dpp
, 20) << "init multipart result: bucket=" << result
.bucket
<< " key=" << result
.key
<< " upload_id=" << result
.upload_id
<< dendl
;
1242 *upload_id
= result
.upload_id
;
1244 return set_cr_done();
1251 class RGWAWSCompleteMultipartCR
: public RGWCoroutine
{
1253 RGWRESTConn
*dest_conn
;
1254 const rgw_obj
& dest_obj
;
1260 struct CompleteMultipartReq
{
1261 map
<int, rgw_sync_aws_multipart_part_info
> parts
;
1263 explicit CompleteMultipartReq(const map
<int, rgw_sync_aws_multipart_part_info
>& _parts
) : parts(_parts
) {}
1265 void dump_xml(Formatter
*f
) const {
1266 for (auto p
: parts
) {
1267 f
->open_object_section("Part");
1268 encode_xml("PartNumber", p
.first
, f
);
1269 encode_xml("ETag", p
.second
.etag
, f
);
1275 struct CompleteMultipartResult
{
1281 void decode_xml(XMLObj
*obj
) {
1282 RGWXMLDecoder::decode_xml("Location", bucket
, obj
);
1283 RGWXMLDecoder::decode_xml("Bucket", bucket
, obj
);
1284 RGWXMLDecoder::decode_xml("Key", key
, obj
);
1285 RGWXMLDecoder::decode_xml("ETag", etag
, obj
);
1290 RGWAWSCompleteMultipartCR(RGWDataSyncCtx
*_sc
,
1291 RGWRESTConn
*_dest_conn
,
1292 const rgw_obj
& _dest_obj
,
1294 const map
<int, rgw_sync_aws_multipart_part_info
>& _parts
) : RGWCoroutine(_sc
->cct
),
1296 dest_conn(_dest_conn
),
1297 dest_obj(_dest_obj
),
1298 upload_id(_upload_id
),
1301 int operate(const DoutPrefixProvider
*dpp
) override
{
1305 rgw_http_param_pair params
[] = { { "uploadId", upload_id
.c_str() }, {nullptr, nullptr} };
1307 XMLFormatter formatter
;
1309 encode_xml("CompleteMultipartUpload", req_enc
, &formatter
);
1311 formatter
.flush(ss
);
1314 bl
.append(ss
.str());
1316 call(new RGWPostRawRESTResourceCR
<bufferlist
> (sc
->cct
, dest_conn
, sc
->env
->http_manager
,
1317 obj_to_aws_path(dest_obj
), params
, nullptr, bl
, &out_bl
));
1321 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj
<< dendl
;
1322 return set_cr_error(retcode
);
1326 * If one of the following fails we cannot abort upload, as we cannot
1327 * extract the upload id. If one of these fail it's very likely that that's
1328 * the least of our problem.
1330 RGWXMLDecoder::XMLParser parser
;
1331 if (!parser
.init()) {
1332 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1333 return set_cr_error(-EIO
);
1336 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1337 string
str(out_bl
.c_str(), out_bl
.length());
1338 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1339 return set_cr_error(-EIO
);
1343 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result
, &parser
, true);
1344 } catch (RGWXMLDecoder::err
& err
) {
1345 string
str(out_bl
.c_str(), out_bl
.length());
1346 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1347 return set_cr_error(-EIO
);
1351 ldpp_dout(dpp
, 20) << "complete multipart result: location=" << result
.location
<< " bucket=" << result
.bucket
<< " key=" << result
.key
<< " etag=" << result
.etag
<< dendl
;
1353 return set_cr_done();
1361 class RGWAWSStreamAbortMultipartUploadCR
: public RGWCoroutine
{
1363 RGWRESTConn
*dest_conn
;
1364 const rgw_obj
& dest_obj
;
1365 const rgw_raw_obj status_obj
;
1371 RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx
*_sc
,
1372 RGWRESTConn
*_dest_conn
,
1373 const rgw_obj
& _dest_obj
,
1374 const rgw_raw_obj
& _status_obj
,
1375 const string
& _upload_id
) : RGWCoroutine(_sc
->cct
), sc(_sc
),
1376 dest_conn(_dest_conn
),
1377 dest_obj(_dest_obj
),
1378 status_obj(_status_obj
),
1379 upload_id(_upload_id
) {}
1381 int operate(const DoutPrefixProvider
*dpp
) override
{
1383 yield
call(new RGWAWSAbortMultipartCR(sc
, dest_conn
, dest_obj
, upload_id
));
1385 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj
<< " upload_id=" << upload_id
<< " retcode=" << retcode
<< dendl
;
1386 /* ignore error, best effort */
1388 yield
call(new RGWRadosRemoveCR(sc
->env
->driver
, status_obj
));
1390 ldpp_dout(dpp
, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj
<< " retcode=" << retcode
<< dendl
;
1391 /* ignore error, best effort */
1393 return set_cr_done();
1400 class RGWAWSStreamObjToCloudMultipartCR
: public RGWCoroutine
{
1402 RGWDataSyncEnv
*sync_env
;
1403 AWSSyncConfig
& conf
;
1404 RGWRESTConn
*source_conn
;
1405 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1406 const rgw_obj
& src_obj
;
1407 const rgw_obj
& dest_obj
;
1411 rgw_sync_aws_src_obj_properties src_properties
;
1412 rgw_rest_obj rest_obj
;
1414 rgw_sync_aws_multipart_upload_info status
;
1416 map
<string
, string
> new_attrs
;
1418 rgw_sync_aws_multipart_part_info
*pcur_part_info
{nullptr};
1422 rgw_raw_obj status_obj
;
1425 RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx
*_sc
,
1426 rgw_bucket_sync_pipe
& _sync_pipe
,
1427 AWSSyncConfig
& _conf
,
1428 RGWRESTConn
*_source_conn
,
1429 const rgw_obj
& _src_obj
,
1430 std::shared_ptr
<AWSSyncConfig_Profile
>& _target
,
1431 const rgw_obj
& _dest_obj
,
1433 const rgw_sync_aws_src_obj_properties
& _src_properties
,
1434 const rgw_rest_obj
& _rest_obj
) : RGWCoroutine(_sc
->cct
),
1438 source_conn(_source_conn
),
1441 dest_obj(_dest_obj
),
1442 obj_size(_obj_size
),
1443 src_properties(_src_properties
),
1444 rest_obj(_rest_obj
),
1445 status_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
,
1446 RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe
, sc
->source_zone
, src_obj
)) {
1450 int operate(const DoutPrefixProvider
*dpp
) override
{
1452 yield
call(new RGWSimpleRadosReadCR
<rgw_sync_aws_multipart_upload_info
>(
1453 dpp
, sync_env
->driver
, status_obj
, &status
, false));
1455 if (retcode
< 0 && retcode
!= -ENOENT
) {
1456 ldpp_dout(dpp
, 0) << "ERROR: failed to read sync status of object " << src_obj
<< " retcode=" << retcode
<< dendl
;
1461 /* check here that mtime and size did not change */
1463 if (status
.src_properties
.mtime
!= src_properties
.mtime
|| status
.obj_size
!= obj_size
||
1464 status
.src_properties
.etag
!= src_properties
.etag
) {
1465 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1470 if (retcode
== -ENOENT
) {
1471 RGWAWSStreamPutCRF::init_send_attrs(dpp
, sc
->cct
, rest_obj
, src_properties
, target
.get(), &new_attrs
);
1473 yield
call(new RGWAWSInitMultipartCR(sc
, target
->conn
.get(), dest_obj
, status
.obj_size
, std::move(new_attrs
), &status
.upload_id
));
1475 return set_cr_error(retcode
);
1478 status
.obj_size
= obj_size
;
1479 status
.src_properties
= src_properties
;
1480 #define MULTIPART_MAX_PARTS 10000
1481 uint64_t min_part_size
= obj_size
/ MULTIPART_MAX_PARTS
;
1482 status
.part_size
= std::max(conf
.s3
.multipart_min_part_size
, min_part_size
);
1483 status
.num_parts
= (obj_size
+ status
.part_size
- 1) / status
.part_size
;
1484 status
.cur_part
= 1;
1487 for (; (uint32_t)status
.cur_part
<= status
.num_parts
; ++status
.cur_part
) {
1489 rgw_sync_aws_multipart_part_info
& cur_part_info
= status
.parts
[status
.cur_part
];
1490 cur_part_info
.part_num
= status
.cur_part
;
1491 cur_part_info
.ofs
= status
.cur_ofs
;
1492 cur_part_info
.size
= std::min((uint64_t)status
.part_size
, status
.obj_size
- status
.cur_ofs
);
1494 pcur_part_info
= &cur_part_info
;
1496 status
.cur_ofs
+= status
.part_size
;
1498 call(new RGWAWSStreamObjToCloudMultipartPartCR(sc
,
1499 source_conn
, src_obj
,
1502 status
.src_properties
,
1505 &cur_part_info
.etag
));
1509 ldpp_dout(dpp
, 0) << "ERROR: failed to sync obj=" << src_obj
<< ", sync via multipart upload, upload_id=" << status
.upload_id
<< " part number " << status
.cur_part
<< " (error: " << cpp_strerror(-retcode
) << ")" << dendl
;
1511 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1512 return set_cr_error(ret_err
);
1515 yield
call(new RGWSimpleRadosWriteCR
<rgw_sync_aws_multipart_upload_info
>(dpp
, sync_env
->driver
, status_obj
, status
));
1517 ldpp_dout(dpp
, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode
<< dendl
;
1518 /* continue with upload anyway */
1520 ldpp_dout(dpp
, 20) << "sync of object=" << src_obj
<< " via multipart upload, finished sending part #" << status
.cur_part
<< " etag=" << pcur_part_info
->etag
<< dendl
;
1523 yield
call(new RGWAWSCompleteMultipartCR(sc
, target
->conn
.get(), dest_obj
, status
.upload_id
, status
.parts
));
1525 ldpp_dout(dpp
, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj
<< " (error: " << cpp_strerror(-retcode
) << ")" << dendl
;
1527 yield
call(new RGWAWSStreamAbortMultipartUploadCR(sc
, target
->conn
.get(), dest_obj
, status_obj
, status
.upload_id
));
1528 return set_cr_error(ret_err
);
1531 /* remove status obj */
1532 yield
call(new RGWRadosRemoveCR(sync_env
->driver
, status_obj
));
1534 ldpp_dout(dpp
, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj
<< " upload_id=" << status
.upload_id
<< " part number " << status
.cur_part
<< " (" << cpp_strerror(-retcode
) << ")" << dendl
;
1535 /* ignore error, best effort */
1537 return set_cr_done();
1544 int decode_attr(map
<string
, bufferlist
>& attrs
, const char *attr_name
, T
*result
, T def_val
)
1546 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
1547 if (iter
== attrs
.end()) {
1551 bufferlist
& bl
= iter
->second
;
1552 if (bl
.length() == 0) {
1556 auto bliter
= bl
.cbegin();
1558 decode(*result
, bliter
);
1559 } catch (buffer::error
& err
) {
1565 // maybe use Fetch Remote Obj instead?
1566 class RGWAWSHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
1567 rgw_bucket_sync_pipe sync_pipe
;
1568 AWSSyncInstanceEnv
& instance
;
1570 uint64_t versioned_epoch
{0};
1572 RGWRESTConn
*source_conn
{nullptr};
1573 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1575 unordered_map
<string
, bool> bucket_created
;
1576 rgw_rest_obj rest_obj
;
1579 uint32_t src_zone_short_id
{0};
1580 uint64_t src_pg_ver
{0};
1584 struct CreateBucketResult
{
1587 void decode_xml(XMLObj
*obj
) {
1588 RGWXMLDecoder::decode_xml("Code", code
, obj
);
1596 RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1597 rgw_bucket_sync_pipe
& _sync_pipe
,
1599 AWSSyncInstanceEnv
& _instance
,
1600 uint64_t _versioned_epoch
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1601 sync_pipe(_sync_pipe
),
1602 instance(_instance
), versioned_epoch(_versioned_epoch
)
1605 ~RGWAWSHandleRemoteObjCBCR(){
1608 int operate(const DoutPrefixProvider
*dpp
) override
{
1610 ret
= decode_attr(attrs
, RGW_ATTR_PG_VER
, &src_pg_ver
, (uint64_t)0);
1612 ldpp_dout(dpp
, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl
;
1614 ret
= decode_attr(attrs
, RGW_ATTR_SOURCE_ZONE
, &src_zone_short_id
, (uint32_t)0);
1616 ldpp_dout(dpp
, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl
;
1617 src_pg_ver
= 0; /* all or nothing */
1620 ldpp_dout(dpp
, 4) << "AWS: download begin: z=" << sc
->source_zone
1621 << " b=" << src_bucket
<< " k=" << key
<< " size=" << size
1622 << " mtime=" << mtime
<< " etag=" << etag
1623 << " zone_short_id=" << src_zone_short_id
<< " pg_ver=" << src_pg_ver
1626 source_conn
= sync_env
->svc
->zone
->get_zone_conn(sc
->source_zone
);
1628 ldpp_dout(dpp
, 0) << "ERROR: cannot find http connection to zone " << sc
->source_zone
<< dendl
;
1629 return set_cr_error(-EINVAL
);
1632 instance
.get_profile(sync_pipe
.info
.source_bs
.bucket
, &target
);
1633 instance
.conf
.get_target(target
, sync_pipe
.dest_bucket_info
, key
, &dest_obj
.bucket
.name
, &dest_obj
.key
.name
);
1635 if (bucket_created
.find(dest_obj
.bucket
.name
) == bucket_created
.end()){
1637 ldpp_dout(dpp
, 0) << "AWS: creating bucket " << dest_obj
.bucket
.name
<< dendl
;
1639 call(new RGWPutRawRESTResourceCR
<bufferlist
> (sc
->cct
, target
->conn
.get(),
1640 sync_env
->http_manager
,
1641 dest_obj
.bucket
.name
, nullptr, bl
, &out_bl
));
1644 RGWXMLDecoder::XMLParser parser
;
1645 if (!parser
.init()) {
1646 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl
;
1647 return set_cr_error(retcode
);
1650 if (!parser
.parse(out_bl
.c_str(), out_bl
.length(), 1)) {
1651 string
str(out_bl
.c_str(), out_bl
.length());
1652 ldpp_dout(dpp
, 5) << "ERROR: failed to parse xml: " << str
<< dendl
;
1653 return set_cr_error(retcode
);
1657 RGWXMLDecoder::decode_xml("Error", result
, &parser
, true);
1658 } catch (RGWXMLDecoder::err
& err
) {
1659 string
str(out_bl
.c_str(), out_bl
.length());
1660 ldpp_dout(dpp
, 5) << "ERROR: unexpected xml: " << str
<< dendl
;
1661 return set_cr_error(retcode
);
1664 if (result
.code
!= "BucketAlreadyOwnedByYou") {
1665 return set_cr_error(retcode
);
1669 bucket_created
[dest_obj
.bucket
.name
] = true;
1673 src_obj
.bucket
= src_bucket
;
1677 rgw_sync_aws_src_obj_properties src_properties
;
1678 src_properties
.mtime
= mtime
;
1679 src_properties
.etag
= etag
;
1680 src_properties
.zone_short_id
= src_zone_short_id
;
1681 src_properties
.pg_ver
= src_pg_ver
;
1682 src_properties
.versioned_epoch
= versioned_epoch
;
1684 if (size
< instance
.conf
.s3
.multipart_sync_threshold
) {
1685 call(new RGWAWSStreamObjToCloudPlainCR(sc
, source_conn
, src_obj
,
1690 rgw_rest_obj rest_obj
;
1692 if (do_decode_rest_obj(dpp
, sc
->cct
, attrs
, headers
, &rest_obj
)) {
1693 ldpp_dout(dpp
, 0) << "ERROR: failed to decode rest obj out of headers=" << headers
<< ", attrs=" << attrs
<< dendl
;
1694 return set_cr_error(-EINVAL
);
1696 call(new RGWAWSStreamObjToCloudMultipartCR(sc
, sync_pipe
, instance
.conf
, source_conn
, src_obj
,
1697 target
, dest_obj
, size
, src_properties
, rest_obj
));
1701 return set_cr_error(retcode
);
1704 return set_cr_done();
1711 class RGWAWSHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
1712 rgw_bucket_sync_pipe sync_pipe
;
1713 AWSSyncInstanceEnv
& instance
;
1714 uint64_t versioned_epoch
;
1716 RGWAWSHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
1717 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1718 AWSSyncInstanceEnv
& _instance
, uint64_t _versioned_epoch
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1719 sync_pipe(_sync_pipe
),
1720 instance(_instance
), versioned_epoch(_versioned_epoch
) {
1723 ~RGWAWSHandleRemoteObjCR() {}
1725 RGWStatRemoteObjCBCR
*allocate_callback() override
{
1726 return new RGWAWSHandleRemoteObjCBCR(sc
, sync_pipe
, key
, instance
, versioned_epoch
);
1730 class RGWAWSRemoveRemoteObjCBCR
: public RGWCoroutine
{
1732 std::shared_ptr
<AWSSyncConfig_Profile
> target
;
1733 rgw_bucket_sync_pipe sync_pipe
;
1735 ceph::real_time mtime
;
1736 AWSSyncInstanceEnv
& instance
;
1739 RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1740 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
1741 AWSSyncInstanceEnv
& _instance
) : RGWCoroutine(_sc
->cct
), sc(_sc
),
1742 sync_pipe(_sync_pipe
), key(_key
),
1743 mtime(_mtime
), instance(_instance
) {}
1744 int operate(const DoutPrefixProvider
*dpp
) override
{
1746 ldpp_dout(dpp
, 0) << ": remove remote obj: z=" << sc
->source_zone
1747 << " b=" <<sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
1749 instance
.get_profile(sync_pipe
.info
.source_bs
.bucket
, &target
);
1750 string path
= instance
.conf
.get_path(target
, sync_pipe
.dest_bucket_info
, key
);
1751 ldpp_dout(dpp
, 0) << "AWS: removing aws object at" << path
<< dendl
;
1753 call(new RGWDeleteRESTResourceCR(sc
->cct
, target
->conn
.get(),
1754 sc
->env
->http_manager
,
1755 path
, nullptr /* params */));
1758 return set_cr_error(retcode
);
1760 return set_cr_done();
1768 class RGWAWSDataSyncModule
: public RGWDataSyncModule
{
1770 AWSSyncInstanceEnv instance
;
1772 RGWAWSDataSyncModule(CephContext
*_cct
, AWSSyncConfig
& _conf
) :
1777 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
1778 instance
.init(sc
, instance_id
);
1781 ~RGWAWSDataSyncModule() {}
1783 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
1784 std::optional
<uint64_t> versioned_epoch
,
1785 const rgw_zone_set_entry
& source_trace_entry
,
1786 rgw_zone_set
*zones_trace
) override
{
1787 ldout(sc
->cct
, 0) << instance
.id
<< ": sync_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1788 return new RGWAWSHandleRemoteObjCR(sc
, sync_pipe
, key
, instance
, versioned_epoch
.value_or(0));
1790 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
,
1791 rgw_zone_set
*zones_trace
) override
{
1792 ldout(sc
->cct
, 0) <<"rm_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1793 return new RGWAWSRemoveRemoteObjCBCR(sc
, sync_pipe
, key
, mtime
, instance
);
1795 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
1796 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
,
1797 rgw_zone_set
*zones_trace
) override
{
1798 ldout(sc
->cct
, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
1799 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1804 class RGWAWSSyncModuleInstance
: public RGWSyncModuleInstance
{
1805 RGWAWSDataSyncModule data_handler
;
1807 RGWAWSSyncModuleInstance(CephContext
*cct
, AWSSyncConfig
& _conf
) : data_handler(cct
, _conf
) {}
1808 RGWDataSyncModule
*get_data_handler() override
{
1809 return &data_handler
;
1813 int RGWAWSSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
){
1816 int r
= conf
.init(dpp
, cct
, config
);
1821 instance
->reset(new RGWAWSSyncModuleInstance(cct
, conf
));