]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_aws.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_sync_module_aws.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/errno.h"
5
6 #include "rgw_common.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_sync_module.h"
9 #include "rgw_data_sync.h"
10 #include "rgw_sync_module_aws.h"
11 #include "rgw_cr_rados.h"
12 #include "rgw_rest_conn.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_acl.h"
15 #include "rgw_zone.h"
16
17 #include "services/svc_zone.h"
18
19 #include <boost/asio/yield.hpp>
20
21 #define dout_subsys ceph_subsys_rgw
22
23
24 #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
25
26 using namespace std;
27
28 static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}";
29
30 static string get_key_oid(const rgw_obj_key& key)
31 {
32 string oid = key.name;
33 if (!key.instance.empty() &&
34 !key.have_null_instance()) {
35 oid += string(":") + key.instance;
36 }
37 return oid;
38 }
39
40 static string obj_to_aws_path(rgw::sal::Object* obj)
41 {
42 string path = obj->get_bucket()->get_name() + "/" + get_key_oid(obj->get_key());
43
44
45 return path;
46 }
47
48 /*
49
50 json configuration definition:
51
52 {
53 "connection": {
54 "access_key": <access>,
55 "secret": <secret>,
56 "endpoint": <endpoint>,
57 "host_style": <path | virtual>,
58 },
59 "acls": [ { "type": <id | email | uri>,
60 "source_id": <source_id>,
61 "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
62 "target_path": <target_path>, # override default
63
64
65 # anything below here is for non trivial configuration
66 # can be used in conjuction with the above
67
68 "default": {
69 "connection": {
70 "access_key": <access>,
71 "secret": <secret>,
72 "endpoint": <endpoint>,
73 "host_style" <path | virtual>,
74 },
75 "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
76 {
77 "type" : <id | email | uri>, # optional, default is id
78 "source_id": <id>,
79 "dest_id": <id>
80 } ... ]
81 "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
82 # final object name will be target_path + "/" + obj
83 },
84 "connections": [
85 {
86 "id": <id>,
87 "access_key": <access>,
88 "secret": <secret>,
89 "endpoint": <endpoint>,
90 } ... ],
91 "acl_profiles": [
92 {
93 "id": <id>, # acl mappings
94 "acls": [ {
95 "type": <id | email | uri>,
96 "source_id": <id>,
97 "dest_id": <id>
98 } ... ]
99 }
100 ],
101 "profiles": [
102 {
103 "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
104 "target_path": <dest>, # (override default)
105 "connection_id": <connection_id>, # optional, if empty references default connection
106 "acls_id": <mappings_id>, # optional, if empty references default mappings
107 } ... ],
108 }
109
110 target path optional variables:
111
112 (evaluated at init)
113 sid: sync instance id, randomly generated by sync process on first sync initalization
114 zonegroup: zonegroup name
115 zonegroup_id: zonegroup name
116 zone: zone name
117 zone_id: zone name
118
119 (evaluated when syncing)
120 bucket: bucket name
121 owner: bucket owner
122
123 */
124
125 struct ACLMapping {
126 ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
127 string source_id;
128 string dest_id;
129
130 ACLMapping() = default;
131
132 ACLMapping(ACLGranteeTypeEnum t,
133 const string& s,
134 const string& d) : type(t),
135 source_id(s),
136 dest_id(d) {}
137
138 void init(const JSONFormattable& config) {
139 const string& t = config["type"];
140
141 if (t == "email") {
142 type = ACL_TYPE_EMAIL_USER;
143 } else if (t == "uri") {
144 type = ACL_TYPE_GROUP;
145 } else {
146 type = ACL_TYPE_CANON_USER;
147 }
148
149 source_id = config["source_id"];
150 dest_id = config["dest_id"];
151 }
152
153 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
154 Formatter::ObjectSection os(jf, "acl_mapping");
155 string s;
156 switch (type) {
157 case ACL_TYPE_EMAIL_USER:
158 s = "email";
159 break;
160 case ACL_TYPE_GROUP:
161 s = "uri";
162 break;
163 default:
164 s = "id";
165 break;
166 }
167 encode_json("type", s, &jf);
168 encode_json("source_id", source_id, &jf);
169 encode_json("dest_id", dest_id, &jf);
170 }
171 };
172
173 struct ACLMappings {
174 map<string, ACLMapping> acl_mappings;
175
176 void init(const JSONFormattable& config) {
177 for (auto& c : config.array()) {
178 ACLMapping m;
179 m.init(c);
180
181 acl_mappings.emplace(std::make_pair(m.source_id, m));
182 }
183 }
184 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
185 Formatter::ArraySection os(jf, "acls");
186
187 for (auto& i : acl_mappings) {
188 i.second.dump_conf(cct, jf);
189 }
190 }
191 };
192
193 struct AWSSyncConfig_ACLProfiles {
194 map<string, std::shared_ptr<ACLMappings> > acl_profiles;
195
196 void init(const JSONFormattable& config) {
197 for (auto& c : config.array()) {
198 const string& profile_id = c["id"];
199
200 std::shared_ptr<ACLMappings> ap{new ACLMappings};
201 ap->init(c["acls"]);
202
203 acl_profiles[profile_id] = ap;
204 }
205 }
206 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
207 Formatter::ArraySection section(jf, "acl_profiles");
208
209 for (auto& p : acl_profiles) {
210 Formatter::ObjectSection section(jf, "profile");
211 encode_json("id", p.first, &jf);
212 p.second->dump_conf(cct, jf);
213 }
214 }
215
216 bool find(const string& profile_id, ACLMappings *result) const {
217 auto iter = acl_profiles.find(profile_id);
218 if (iter == acl_profiles.end()) {
219 return false;
220 }
221 *result = *iter->second;
222 return true;
223 }
224 };
225
226 struct AWSSyncConfig_Connection {
227 string connection_id;
228 string endpoint;
229 RGWAccessKey key;
230 std::optional<string> region;
231 HostStyle host_style{PathStyle};
232
233 bool has_endpoint{false};
234 bool has_key{false};
235 bool has_host_style{false};
236
237 void init(const JSONFormattable& config) {
238 has_endpoint = config.exists("endpoint");
239 has_key = config.exists("access_key") || config.exists("secret");
240 has_host_style = config.exists("host_style");
241
242 connection_id = config["id"];
243 endpoint = config["endpoint"];
244
245 key = RGWAccessKey(config["access_key"], config["secret"]);
246
247 if (config.exists("region")) {
248 region = config["region"];
249 } else {
250 region.reset();
251 }
252
253 string host_style_str = config["host_style"];
254 if (host_style_str != "virtual") {
255 host_style = PathStyle;
256 } else {
257 host_style = VirtualStyle;
258 }
259 }
260 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
261 Formatter::ObjectSection section(jf, "connection");
262 encode_json("id", connection_id, &jf);
263 encode_json("endpoint", endpoint, &jf);
264 string s = (host_style == PathStyle ? "path" : "virtual");
265 encode_json("region", region, &jf);
266 encode_json("host_style", s, &jf);
267
268 {
269 Formatter::ObjectSection os(jf, "key");
270 encode_json("access_key", key.id, &jf);
271 string secret = (key.key.empty() ? "" : "******");
272 encode_json("secret", secret, &jf);
273 }
274 }
275 };
276
277 static int conf_to_uint64(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
278 {
279 string sval;
280 if (config.find(key, &sval)) {
281 string err;
282 uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
283 if (!err.empty()) {
284 ldpp_dout(dpp, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
285 return -EINVAL;
286 }
287 *pval = val;
288 }
289 return 0;
290 }
291
292 struct AWSSyncConfig_S3 {
293 uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
294 uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
295
296 int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
297 int r = conf_to_uint64(dpp, cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
298 if (r < 0) {
299 return r;
300 }
301
302 r = conf_to_uint64(dpp, cct, config, "multipart_min_part_size", &multipart_min_part_size);
303 if (r < 0) {
304 return r;
305 }
306 #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
307 if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
308 multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
309 }
310 return 0;
311 }
312
313 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
314 Formatter::ObjectSection section(jf, "s3");
315 encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf);
316 encode_json("multipart_min_part_size", multipart_min_part_size, &jf);
317 }
318 };
319
320 struct AWSSyncConfig_Profile {
321 string source_bucket;
322 bool prefix{false};
323 string target_path;
324 string connection_id;
325 string acls_id;
326
327 std::shared_ptr<AWSSyncConfig_Connection> conn_conf;
328 std::shared_ptr<ACLMappings> acls;
329
330 std::shared_ptr<RGWRESTConn> conn;
331
332 void init(const JSONFormattable& config) {
333 source_bucket = config["source_bucket"];
334
335 prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*');
336
337 if (prefix) {
338 source_bucket = source_bucket.substr(0, source_bucket.size() - 1);
339 }
340
341 target_path = config["target_path"];
342 connection_id = config["connection_id"];
343 acls_id = config["acls_id"];
344
345 if (config.exists("connection")) {
346 conn_conf = make_shared<AWSSyncConfig_Connection>();
347 conn_conf->init(config["connection"]);
348 }
349
350 if (config.exists("acls")) {
351 acls = make_shared<ACLMappings>();
352 acls->init(config["acls"]);
353 }
354 }
355
356 void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const {
357 Formatter::ObjectSection config(jf, section);
358 string sb{source_bucket};
359 if (prefix) {
360 sb.append("*");
361 }
362 encode_json("source_bucket", sb, &jf);
363 encode_json("target_path", target_path, &jf);
364 encode_json("connection_id", connection_id, &jf);
365 encode_json("acls_id", acls_id, &jf);
366 if (conn_conf.get()) {
367 conn_conf->dump_conf(cct, jf);
368 }
369 if (acls.get()) {
370 acls->dump_conf(cct, jf);
371 }
372 }
373 };
374
375 static void find_and_replace(const string& src, const string& find, const string& replace, string *dest)
376 {
377 string s = src;
378
379 size_t pos = s.find(find);
380 while (pos != string::npos) {
381 size_t next_ofs = pos + find.size();
382 s = s.substr(0, pos) + replace + s.substr(next_ofs);
383 pos = s.find(find, next_ofs);
384 }
385
386 *dest = s;
387 }
388
389 static void apply_meta_param(const string& src, const string& param, const string& val, string *dest)
390 {
391 string s = string("${") + param + "}";
392 find_and_replace(src, s, val, dest);
393 }
394
395
396 struct AWSSyncConfig {
397 AWSSyncConfig_Profile default_profile;
398 std::shared_ptr<AWSSyncConfig_Profile> root_profile;
399
400 map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections;
401 AWSSyncConfig_ACLProfiles acl_profiles;
402
403 map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles;
404
405 AWSSyncConfig_S3 s3;
406
407 int init_profile(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
408 bool connection_must_exist) {
409 if (!profile.connection_id.empty()) {
410 if (profile.conn_conf) {
411 ldpp_dout(dpp, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
412 return -EINVAL;
413 }
414 if (connections.find(profile.connection_id) == connections.end()) {
415 ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
416 return -EINVAL;
417 }
418 profile.conn_conf = connections[profile.connection_id];
419 } else if (!profile.conn_conf) {
420 profile.connection_id = default_profile.connection_id;
421 auto i = connections.find(profile.connection_id);
422 if (i != connections.end()) {
423 profile.conn_conf = i->second;
424 }
425 }
426
427 if (connection_must_exist && !profile.conn_conf) {
428 ldpp_dout(dpp, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
429 return -EINVAL;
430 }
431
432 if (profile.conn_conf && default_profile.conn_conf) {
433 if (!profile.conn_conf->has_endpoint) {
434 profile.conn_conf->endpoint = default_profile.conn_conf->endpoint;
435 }
436 if (!profile.conn_conf->has_host_style) {
437 profile.conn_conf->host_style = default_profile.conn_conf->host_style;
438 }
439 if (!profile.conn_conf->has_key) {
440 profile.conn_conf->key = default_profile.conn_conf->key;
441 }
442 }
443
444 ACLMappings acl_mappings;
445
446 if (!profile.acls_id.empty()) {
447 if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
448 ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
449 return -EINVAL;
450 }
451 profile.acls = acl_profiles.acl_profiles[profile.acls_id];
452 } else if (!profile.acls) {
453 if (default_profile.acls) {
454 profile.acls = default_profile.acls;
455 profile.acls_id = default_profile.acls_id;
456 }
457 }
458
459 if (profile.target_path.empty()) {
460 profile.target_path = default_profile.target_path;
461 }
462 if (profile.target_path.empty()) {
463 profile.target_path = default_target_path;
464 }
465
466 return 0;
467 }
468
469 int init_target(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
470 std::shared_ptr<AWSSyncConfig_Profile> profile;
471 profile.reset(new AWSSyncConfig_Profile);
472 profile->init(profile_conf);
473
474 int ret = init_profile(dpp, cct, profile_conf, *profile, true);
475 if (ret < 0) {
476 return ret;
477 }
478
479 auto& sb = profile->source_bucket;
480
481 if (explicit_profiles.find(sb) != explicit_profiles.end()) {
482 ldpp_dout(dpp, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
483 }
484
485 explicit_profiles[sb] = profile;
486 if (ptarget) {
487 *ptarget = profile;
488 }
489 return 0;
490 }
491
492 bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
493 const string& name = bucket.name;
494 auto iter = explicit_profiles.upper_bound(name);
495 if (iter == explicit_profiles.begin()) {
496 return false;
497 }
498
499 --iter;
500 if (iter->first.size() > name.size()) {
501 return false;
502 }
503 if (name.compare(0, iter->first.size(), iter->first) != 0) {
504 return false;
505 }
506
507 std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second;
508
509 if (!target->prefix &&
510 name.size() != iter->first.size()) {
511 return false;
512 }
513
514 *result = target;
515 return true;
516 }
517
518 void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
519 if (!do_find_profile(bucket, result)) {
520 *result = root_profile;
521 }
522 }
523
524 AWSSyncConfig() {}
525
526 int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
527 auto& default_conf = config["default"];
528
529 if (config.exists("default")) {
530 default_profile.init(default_conf);
531 init_profile(dpp, cct, default_conf, default_profile, false);
532 }
533
534 for (auto& conn : config["connections"].array()) {
535 auto new_conn = conn;
536
537 std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection};
538 c->init(new_conn);
539
540 connections[new_conn["id"]] = c;
541 }
542
543 acl_profiles.init(config["acl_profiles"]);
544
545 int r = s3.init(dpp, cct, config["s3"]);
546 if (r < 0) {
547 return r;
548 }
549
550 auto new_root_conf = config;
551
552 r = init_target(dpp, cct, new_root_conf, &root_profile); /* the root profile config */
553 if (r < 0) {
554 return r;
555 }
556
557 for (auto target_conf : config["profiles"].array()) {
558 int r = init_target(dpp, cct, target_conf, nullptr);
559 if (r < 0) {
560 return r;
561 }
562 }
563
564 JSONFormatter jf(true);
565 dump_conf(cct, jf);
566 stringstream ss;
567 jf.flush(ss);
568
569 ldpp_dout(dpp, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;
570
571 return 0;
572 }
573
574 void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) {
575 apply_meta_param(path, "sid", sid, dest);
576
577 const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup();
578 apply_meta_param(path, "zonegroup", zg.get_name(), dest);
579 apply_meta_param(path, "zonegroup_id", zg.get_id(), dest);
580
581 const RGWZone& zone = sc->env->svc->zone->get_zone();
582 apply_meta_param(path, "zone", zone.name, dest);
583 apply_meta_param(path, "zone_id", zone.id, dest);
584 }
585
586 void update_config(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, const string& sid) {
587 expand_target(sc, sid, root_profile->target_path, &root_profile->target_path);
588 ldpp_dout(dpp, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
589 for (auto& t : explicit_profiles) {
590 expand_target(sc, sid, t.second->target_path, &t.second->target_path);
591 ldpp_dout(dpp, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
592 }
593 }
594
595 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
596 Formatter::ObjectSection config(jf, "config");
597 root_profile->dump_conf(cct, jf);
598 jf.open_array_section("connections");
599 for (auto c : connections) {
600 c.second->dump_conf(cct, jf);
601 }
602 jf.close_section();
603
604 acl_profiles.dump_conf(cct, jf);
605
606 { // targets
607 Formatter::ArraySection as(jf, "profiles");
608 for (auto& t : explicit_profiles) {
609 Formatter::ObjectSection target_section(jf, "profile");
610 encode_json("name", t.first, &jf);
611 t.second->dump_conf(cct, jf);
612 }
613 }
614 }
615
616 string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile,
617 const RGWBucketInfo& bucket_info,
618 const rgw_obj_key& obj) {
619 string bucket_str;
620 string owner;
621 if (!bucket_info.owner.tenant.empty()) {
622 bucket_str = owner = bucket_info.owner.tenant + "-";
623 owner += bucket_info.owner.id;
624 }
625 bucket_str += bucket_info.bucket.name;
626
627 const string& path = profile->target_path;
628
629 string new_path;
630 apply_meta_param(path, "bucket", bucket_str, &new_path);
631 apply_meta_param(new_path, "owner", owner, &new_path);
632
633 new_path += string("/") + get_key_oid(obj);
634
635 return new_path;
636 }
637
638 void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile,
639 const RGWBucketInfo& bucket_info,
640 const rgw_obj_key& obj,
641 string *bucket_name,
642 string *obj_name) {
643 string path = get_path(profile, bucket_info, obj);
644 size_t pos = path.find('/');
645
646 *bucket_name = path.substr(0, pos);
647 *obj_name = path.substr(pos + 1);
648 }
649
650 void init_conns(RGWDataSyncCtx *sc, const string& id) {
651 auto sync_env = sc->env;
652
653 update_config(sync_env->dpp, sc, id);
654
655 auto& root_conf = root_profile->conn_conf;
656
657 root_profile->conn.reset(new S3RESTConn(sc->cct,
658 sync_env->svc->zone,
659 id,
660 { root_conf->endpoint },
661 root_conf->key,
662 root_conf->region,
663 root_conf->host_style));
664
665 for (auto i : explicit_profiles) {
666 auto& c = i.second;
667
668 c->conn.reset(new S3RESTConn(sc->cct,
669 sync_env->svc->zone,
670 id,
671 { c->conn_conf->endpoint },
672 c->conn_conf->key,
673 c->conn_conf->region,
674 c->conn_conf->host_style));
675 }
676 }
677 };
678
679
680 struct AWSSyncInstanceEnv {
681 AWSSyncConfig conf;
682 string id;
683
684 explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {}
685
686 void init(RGWDataSyncCtx *sc, uint64_t instance_id) {
687 char buf[32];
688 snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
689 id = buf;
690
691 conf.init_conns(sc, id);
692 }
693
694 void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
695 conf.find_profile(bucket, ptarget);
696 ceph_assert(ptarget);
697 }
698 };
699
700 static int do_decode_rest_obj(const DoutPrefixProvider *dpp, CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
701 {
702 for (auto header : headers) {
703 const string& val = header.second;
704 if (header.first == "RGWX_OBJECT_SIZE") {
705 info->content_len = atoi(val.c_str());
706 } else {
707 info->attrs[header.first] = val;
708 }
709 }
710
711 info->acls.set_ctx(cct);
712 auto aiter = attrs.find(RGW_ATTR_ACL);
713 if (aiter != attrs.end()) {
714 bufferlist& bl = aiter->second;
715 auto bliter = bl.cbegin();
716 try {
717 info->acls.decode(bliter);
718 } catch (buffer::error& err) {
719 ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
720 return -EIO;
721 }
722 } else {
723 ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
724 }
725
726 return 0;
727 }
728
729 class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
730 {
731 RGWDataSyncCtx *sc;
732 RGWRESTConn *conn;
733 rgw::sal::Object* src_obj;
734 RGWRESTConn::get_obj_params req_params;
735
736 rgw_sync_aws_src_obj_properties src_properties;
737 public:
738 RGWRESTStreamGetCRF(CephContext *_cct,
739 RGWCoroutinesEnv *_env,
740 RGWCoroutine *_caller,
741 RGWDataSyncCtx *_sc,
742 RGWRESTConn *_conn,
743 rgw::sal::Object* _src_obj,
744 const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller,
745 _sc->env->http_manager, _src_obj->get_key()),
746 sc(_sc), conn(_conn), src_obj(_src_obj),
747 src_properties(_src_properties) {
748 }
749
750 int init(const DoutPrefixProvider *dpp) override {
751 /* init input connection */
752
753
754 req_params.get_op = true;
755 req_params.prepend_metadata = true;
756
757 req_params.unmod_ptr = &src_properties.mtime;
758 req_params.etag = src_properties.etag;
759 req_params.mod_zone_id = src_properties.zone_short_id;
760 req_params.mod_pg_ver = src_properties.pg_ver;
761
762 if (range.is_set) {
763 req_params.range_is_set = true;
764 req_params.range_start = range.ofs;
765 req_params.range_end = range.ofs + range.size - 1;
766 }
767
768 RGWRESTStreamRWRequest *in_req;
769 int ret = conn->get_obj(dpp, src_obj, req_params, false /* send */, &in_req);
770 if (ret < 0) {
771 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
772 return ret;
773 }
774
775 set_req(in_req);
776
777 return RGWStreamReadHTTPResourceCRF::init(dpp);
778 }
779
780 int decode_rest_obj(const DoutPrefixProvider *dpp, map<string, string>& headers, bufferlist& extra_data) override {
781 map<string, bufferlist> src_attrs;
782
783 ldpp_dout(dpp, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
784
785 if (extra_data.length() > 0) {
786 JSONParser jp;
787 if (!jp.parse(extra_data.c_str(), extra_data.length())) {
788 ldpp_dout(dpp, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
789 return -EIO;
790 }
791
792 JSONDecoder::decode_json("attrs", src_attrs, &jp);
793 }
794 return do_decode_rest_obj(dpp, sc->cct, src_attrs, headers, &rest_obj);
795 }
796
797 bool need_extra_data() override {
798 return true;
799 }
800 };
801
802 static std::set<string> keep_headers = { "CONTENT_TYPE",
803 "CONTENT_ENCODING",
804 "CONTENT_DISPOSITION",
805 "CONTENT_LANGUAGE" };
806
807 class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
808 {
809 RGWDataSyncCtx *sc;
810 rgw_sync_aws_src_obj_properties src_properties;
811 std::shared_ptr<AWSSyncConfig_Profile> target;
812 rgw::sal::Object* dest_obj;
813 string etag;
814 public:
815 RGWAWSStreamPutCRF(CephContext *_cct,
816 RGWCoroutinesEnv *_env,
817 RGWCoroutine *_caller,
818 RGWDataSyncCtx *_sc,
819 const rgw_sync_aws_src_obj_properties& _src_properties,
820 std::shared_ptr<AWSSyncConfig_Profile>& _target,
821 rgw::sal::Object* _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager),
822 sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) {
823 }
824
825 int init() override {
826 /* init output connection */
827 RGWRESTStreamS3PutObj *out_req{nullptr};
828
829 if (multipart.is_multipart) {
830 char buf[32];
831 snprintf(buf, sizeof(buf), "%d", multipart.part_num);
832 rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
833 { "partNumber", buf },
834 { nullptr, nullptr } };
835 target->conn->put_obj_send_init(dest_obj, params, &out_req);
836 } else {
837 target->conn->put_obj_send_init(dest_obj, nullptr, &out_req);
838 }
839
840 set_req(out_req);
841
842 return RGWStreamWriteHTTPResourceCRF::init();
843 }
844
845 static bool keep_attr(const string& h) {
846 return (keep_headers.find(h) != keep_headers.end() ||
847 boost::algorithm::starts_with(h, "X_AMZ_"));
848 }
849
850 static void init_send_attrs(const DoutPrefixProvider *dpp,
851 CephContext *cct,
852 const rgw_rest_obj& rest_obj,
853 const rgw_sync_aws_src_obj_properties& src_properties,
854 const AWSSyncConfig_Profile *target,
855 map<string, string> *attrs) {
856 auto& new_attrs = *attrs;
857
858 new_attrs.clear();
859
860 for (auto& hi : rest_obj.attrs) {
861 if (keep_attr(hi.first)) {
862 new_attrs.insert(hi);
863 }
864 }
865
866 auto acl = rest_obj.acls.get_acl();
867
868 map<int, vector<string> > access_map;
869
870 if (target->acls) {
871 for (auto& grant : acl.get_grant_map()) {
872 auto& orig_grantee = grant.first;
873 auto& perm = grant.second;
874
875 string grantee;
876
877 const auto& am = target->acls->acl_mappings;
878
879 auto iter = am.find(orig_grantee);
880 if (iter == am.end()) {
881 ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
882 continue;
883 }
884
885 grantee = iter->second.dest_id;
886
887 string type;
888
889 switch (iter->second.type) {
890 case ACL_TYPE_CANON_USER:
891 type = "id";
892 break;
893 case ACL_TYPE_EMAIL_USER:
894 type = "emailAddress";
895 break;
896 case ACL_TYPE_GROUP:
897 type = "uri";
898 break;
899 default:
900 continue;
901 }
902
903 string tv = type + "=" + grantee;
904
905 int flags = perm.get_permission().get_permissions();
906 if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
907 access_map[flags].push_back(tv);
908 continue;
909 }
910
911 for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
912 if (flags & i) {
913 access_map[i].push_back(tv);
914 }
915 }
916 }
917 }
918
919 for (auto aiter : access_map) {
920 int grant_type = aiter.first;
921
922 string header_str("x-amz-grant-");
923
924 switch (grant_type) {
925 case RGW_PERM_READ:
926 header_str.append("read");
927 break;
928 case RGW_PERM_WRITE:
929 header_str.append("write");
930 break;
931 case RGW_PERM_READ_ACP:
932 header_str.append("read-acp");
933 break;
934 case RGW_PERM_WRITE_ACP:
935 header_str.append("write-acp");
936 break;
937 case RGW_PERM_FULL_CONTROL:
938 header_str.append("full-control");
939 break;
940 }
941
942 string s;
943
944 for (auto viter : aiter.second) {
945 if (!s.empty()) {
946 s.append(", ");
947 }
948 s.append(viter);
949 }
950
951 ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
952
953 new_attrs[header_str] = s;
954 }
955
956 char buf[32];
957 snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch);
958 new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
959
960 utime_t ut(src_properties.mtime);
961 snprintf(buf, sizeof(buf), "%lld.%09lld",
962 (long long)ut.sec(),
963 (long long)ut.nsec());
964
965 new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
966 new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag;
967 new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
968 if (!rest_obj.key.instance.empty()) {
969 new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
970 }
971 }
972
973 void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
974 RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
975
976 map<string, string> new_attrs;
977 if (!multipart.is_multipart) {
978 init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
979 }
980
981 r->set_send_length(rest_obj.content_len);
982
983 RGWAccessControlPolicy policy;
984
985 r->send_ready(dpp, target->conn->get_key(), new_attrs, policy);
986 }
987
988 void handle_headers(const map<string, string>& headers) {
989 for (auto h : headers) {
990 if (h.first == "ETAG") {
991 etag = h.second;
992 }
993 }
994 }
995
996 bool get_etag(string *petag) {
997 if (etag.empty()) {
998 return false;
999 }
1000 *petag = etag;
1001 return true;
1002 }
1003 };
1004
1005
1006 class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
1007 RGWDataSyncCtx *sc;
1008 RGWRESTConn *source_conn;
1009 std::shared_ptr<AWSSyncConfig_Profile> target;
1010 rgw::sal::Object* src_obj;
1011 rgw::sal::Object* dest_obj;
1012
1013 rgw_sync_aws_src_obj_properties src_properties;
1014
1015 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
1016 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
1017
1018 public:
1019 RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc,
1020 RGWRESTConn *_source_conn,
1021 rgw::sal::Object* _src_obj,
1022 const rgw_sync_aws_src_obj_properties& _src_properties,
1023 std::shared_ptr<AWSSyncConfig_Profile> _target,
1024 rgw::sal::Object* _dest_obj) : RGWCoroutine(_sc->cct),
1025 sc(_sc),
1026 source_conn(_source_conn),
1027 target(_target),
1028 src_obj(_src_obj),
1029 dest_obj(_dest_obj),
1030 src_properties(_src_properties) {}
1031
1032 int operate(const DoutPrefixProvider *dpp) override {
1033 reenter(this) {
1034 /* init input */
1035 in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
1036 source_conn, src_obj,
1037 src_properties));
1038
1039 /* init output */
1040 out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
1041 src_properties, target, dest_obj));
1042
1043 yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
1044 if (retcode < 0) {
1045 return set_cr_error(retcode);
1046 }
1047
1048 return set_cr_done();
1049 }
1050
1051 return 0;
1052 }
1053 };
1054
1055 class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
1056 RGWDataSyncCtx *sc;
1057 RGWRESTConn *source_conn;
1058 std::shared_ptr<AWSSyncConfig_Profile> target;
1059 rgw::sal::Object* src_obj;
1060 rgw::sal::Object* dest_obj;
1061
1062 rgw_sync_aws_src_obj_properties src_properties;
1063
1064 string upload_id;
1065
1066 rgw_sync_aws_multipart_part_info part_info;
1067
1068 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
1069 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
1070
1071 string *petag;
1072
1073 public:
1074 RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc,
1075 RGWRESTConn *_source_conn,
1076 rgw::sal::Object* _src_obj,
1077 std::shared_ptr<AWSSyncConfig_Profile>& _target,
1078 rgw::sal::Object* _dest_obj,
1079 const rgw_sync_aws_src_obj_properties& _src_properties,
1080 const string& _upload_id,
1081 const rgw_sync_aws_multipart_part_info& _part_info,
1082 string *_petag) : RGWCoroutine(_sc->cct),
1083 sc(_sc),
1084 source_conn(_source_conn),
1085 target(_target),
1086 src_obj(_src_obj),
1087 dest_obj(_dest_obj),
1088 src_properties(_src_properties),
1089 upload_id(_upload_id),
1090 part_info(_part_info),
1091 petag(_petag) {}
1092
1093 int operate(const DoutPrefixProvider *dpp) override {
1094 reenter(this) {
1095 /* init input */
1096 in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
1097 source_conn, src_obj,
1098 src_properties));
1099
1100 in_crf->set_range(part_info.ofs, part_info.size);
1101
1102 /* init output */
1103 out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
1104 src_properties, target, dest_obj));
1105
1106 out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
1107
1108 yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
1109 if (retcode < 0) {
1110 return set_cr_error(retcode);
1111 }
1112
1113 if (!(static_cast<RGWAWSStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
1114 ldpp_dout(dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl;
1115 return set_cr_error(-EIO);
1116 }
1117
1118 return set_cr_done();
1119 }
1120
1121 return 0;
1122 }
1123 };
1124
1125 class RGWAWSAbortMultipartCR : public RGWCoroutine {
1126 RGWDataSyncCtx *sc;
1127 RGWRESTConn *dest_conn;
1128 rgw::sal::Object* dest_obj;
1129
1130 string upload_id;
1131
1132 public:
1133 RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc,
1134 RGWRESTConn *_dest_conn,
1135 rgw::sal::Object* _dest_obj,
1136 const string& _upload_id) : RGWCoroutine(_sc->cct),
1137 sc(_sc),
1138 dest_conn(_dest_conn),
1139 dest_obj(_dest_obj),
1140 upload_id(_upload_id) {}
1141
1142 int operate(const DoutPrefixProvider *dpp) override {
1143 reenter(this) {
1144
1145 yield {
1146 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
1147 bufferlist bl;
1148 call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager,
1149 obj_to_aws_path(dest_obj), params));
1150 }
1151
1152 if (retcode < 0) {
1153 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
1154 return set_cr_error(retcode);
1155 }
1156
1157 return set_cr_done();
1158 }
1159
1160 return 0;
1161 }
1162 };
1163
1164 class RGWAWSInitMultipartCR : public RGWCoroutine {
1165 RGWDataSyncCtx *sc;
1166 RGWRESTConn *dest_conn;
1167 rgw::sal::Object* dest_obj;
1168
1169 uint64_t obj_size;
1170 map<string, string> attrs;
1171
1172 bufferlist out_bl;
1173
1174 string *upload_id;
1175
1176 struct InitMultipartResult {
1177 string bucket;
1178 string key;
1179 string upload_id;
1180
1181 void decode_xml(XMLObj *obj) {
1182 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
1183 RGWXMLDecoder::decode_xml("Key", key, obj);
1184 RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
1185 }
1186 } result;
1187
1188 public:
1189 RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc,
1190 RGWRESTConn *_dest_conn,
1191 rgw::sal::Object* _dest_obj,
1192 uint64_t _obj_size,
1193 const map<string, string>& _attrs,
1194 string *_upload_id) : RGWCoroutine(_sc->cct),
1195 sc(_sc),
1196 dest_conn(_dest_conn),
1197 dest_obj(_dest_obj),
1198 obj_size(_obj_size),
1199 attrs(_attrs),
1200 upload_id(_upload_id) {}
1201
1202 int operate(const DoutPrefixProvider *dpp) override {
1203 reenter(this) {
1204
1205 yield {
1206 rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
1207 bufferlist bl;
1208 call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
1209 obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
1210 }
1211
1212 if (retcode < 0) {
1213 ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
1214 return set_cr_error(retcode);
1215 }
1216 {
1217 /*
1218 * If one of the following fails we cannot abort upload, as we cannot
1219 * extract the upload id. If one of these fail it's very likely that that's
1220 * the least of our problem.
1221 */
1222 RGWXMLDecoder::XMLParser parser;
1223 if (!parser.init()) {
1224 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1225 return set_cr_error(-EIO);
1226 }
1227
1228 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1229 string str(out_bl.c_str(), out_bl.length());
1230 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
1231 return set_cr_error(-EIO);
1232 }
1233
1234 try {
1235 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
1236 } catch (RGWXMLDecoder::err& err) {
1237 string str(out_bl.c_str(), out_bl.length());
1238 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1239 return set_cr_error(-EIO);
1240 }
1241 }
1242
1243 ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
1244
1245 *upload_id = result.upload_id;
1246
1247 return set_cr_done();
1248 }
1249
1250 return 0;
1251 }
1252 };
1253
1254 class RGWAWSCompleteMultipartCR : public RGWCoroutine {
1255 RGWDataSyncCtx *sc;
1256 RGWRESTConn *dest_conn;
1257 rgw::sal::Object* dest_obj;
1258
1259 bufferlist out_bl;
1260
1261 string upload_id;
1262
1263 struct CompleteMultipartReq {
1264 map<int, rgw_sync_aws_multipart_part_info> parts;
1265
1266 explicit CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}
1267
1268 void dump_xml(Formatter *f) const {
1269 for (auto p : parts) {
1270 f->open_object_section("Part");
1271 encode_xml("PartNumber", p.first, f);
1272 encode_xml("ETag", p.second.etag, f);
1273 f->close_section();
1274 };
1275 }
1276 } req_enc;
1277
1278 struct CompleteMultipartResult {
1279 string location;
1280 string bucket;
1281 string key;
1282 string etag;
1283
1284 void decode_xml(XMLObj *obj) {
1285 RGWXMLDecoder::decode_xml("Location", bucket, obj);
1286 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
1287 RGWXMLDecoder::decode_xml("Key", key, obj);
1288 RGWXMLDecoder::decode_xml("ETag", etag, obj);
1289 }
1290 } result;
1291
1292 public:
1293 RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc,
1294 RGWRESTConn *_dest_conn,
1295 rgw::sal::Object* _dest_obj,
1296 string _upload_id,
1297 const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sc->cct),
1298 sc(_sc),
1299 dest_conn(_dest_conn),
1300 dest_obj(_dest_obj),
1301 upload_id(_upload_id),
1302 req_enc(_parts) {}
1303
1304 int operate(const DoutPrefixProvider *dpp) override {
1305 reenter(this) {
1306
1307 yield {
1308 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
1309 stringstream ss;
1310 XMLFormatter formatter;
1311
1312 encode_xml("CompleteMultipartUpload", req_enc, &formatter);
1313
1314 formatter.flush(ss);
1315
1316 bufferlist bl;
1317 bl.append(ss.str());
1318
1319 call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
1320 obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
1321 }
1322
1323 if (retcode < 0) {
1324 ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
1325 return set_cr_error(retcode);
1326 }
1327 {
1328 /*
1329 * If one of the following fails we cannot abort upload, as we cannot
1330 * extract the upload id. If one of these fail it's very likely that that's
1331 * the least of our problem.
1332 */
1333 RGWXMLDecoder::XMLParser parser;
1334 if (!parser.init()) {
1335 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1336 return set_cr_error(-EIO);
1337 }
1338
1339 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1340 string str(out_bl.c_str(), out_bl.length());
1341 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
1342 return set_cr_error(-EIO);
1343 }
1344
1345 try {
1346 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
1347 } catch (RGWXMLDecoder::err& err) {
1348 string str(out_bl.c_str(), out_bl.length());
1349 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1350 return set_cr_error(-EIO);
1351 }
1352 }
1353
1354 ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
1355
1356 return set_cr_done();
1357 }
1358
1359 return 0;
1360 }
1361 };
1362
1363
1364 class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
1365 RGWDataSyncCtx *sc;
1366 RGWRESTConn *dest_conn;
1367 rgw::sal::Object* dest_obj;
1368 const rgw_raw_obj status_obj;
1369
1370 string upload_id;
1371
1372 public:
1373
1374 RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc,
1375 RGWRESTConn *_dest_conn,
1376 rgw::sal::Object* _dest_obj,
1377 const rgw_raw_obj& _status_obj,
1378 const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc),
1379 dest_conn(_dest_conn),
1380 dest_obj(_dest_obj),
1381 status_obj(_status_obj),
1382 upload_id(_upload_id) {}
1383
1384 int operate(const DoutPrefixProvider *dpp) override {
1385 reenter(this) {
1386 yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id));
1387 if (retcode < 0) {
1388 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
1389 /* ignore error, best effort */
1390 }
1391 yield call(new RGWRadosRemoveCR(sc->env->store, status_obj));
1392 if (retcode < 0) {
1393 ldpp_dout(dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
1394 /* ignore error, best effort */
1395 }
1396 return set_cr_done();
1397 }
1398
1399 return 0;
1400 }
1401 };
1402
1403 class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
1404 RGWDataSyncCtx *sc;
1405 RGWDataSyncEnv *sync_env;
1406 AWSSyncConfig& conf;
1407 RGWRESTConn *source_conn;
1408 std::shared_ptr<AWSSyncConfig_Profile> target;
1409 rgw::sal::Object* src_obj;
1410 rgw::sal::Object* dest_obj;
1411
1412 uint64_t obj_size;
1413 string src_etag;
1414 rgw_sync_aws_src_obj_properties src_properties;
1415 rgw_rest_obj rest_obj;
1416
1417 rgw_sync_aws_multipart_upload_info status;
1418
1419 map<string, string> new_attrs;
1420
1421 rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
1422
1423 int ret_err{0};
1424
1425 rgw_raw_obj status_obj;
1426
1427 public:
1428 RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc,
1429 rgw_bucket_sync_pipe& _sync_pipe,
1430 AWSSyncConfig& _conf,
1431 RGWRESTConn *_source_conn,
1432 rgw::sal::Object* _src_obj,
1433 std::shared_ptr<AWSSyncConfig_Profile>& _target,
1434 rgw::sal::Object* _dest_obj,
1435 uint64_t _obj_size,
1436 const rgw_sync_aws_src_obj_properties& _src_properties,
1437 const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct),
1438 sc(_sc),
1439 sync_env(_sc->env),
1440 conf(_conf),
1441 source_conn(_source_conn),
1442 target(_target),
1443 src_obj(_src_obj),
1444 dest_obj(_dest_obj),
1445 obj_size(_obj_size),
1446 src_properties(_src_properties),
1447 rest_obj(_rest_obj),
1448 status_obj(sync_env->svc->zone->get_zone_params().log_pool,
1449 RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) {
1450 }
1451
1452
1453 int operate(const DoutPrefixProvider *dpp) override {
1454 reenter(this) {
1455 yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj,
1456 status_obj, &status, false));
1457
1458 if (retcode < 0 && retcode != -ENOENT) {
1459 ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
1460 return retcode;
1461 }
1462
1463 if (retcode >= 0) {
1464 /* check here that mtime and size did not change */
1465
1466 if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
1467 status.src_properties.etag != src_properties.etag) {
1468 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
1469 retcode = -ENOENT;
1470 }
1471 }
1472
1473 if (retcode == -ENOENT) {
1474 RGWAWSStreamPutCRF::init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
1475
1476 yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id));
1477 if (retcode < 0) {
1478 return set_cr_error(retcode);
1479 }
1480
1481 status.obj_size = obj_size;
1482 status.src_properties = src_properties;
1483 #define MULTIPART_MAX_PARTS 10000
1484 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
1485 status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
1486 status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
1487 status.cur_part = 1;
1488 }
1489
1490 for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
1491 yield {
1492 rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
1493 cur_part_info.part_num = status.cur_part;
1494 cur_part_info.ofs = status.cur_ofs;
1495 cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
1496
1497 pcur_part_info = &cur_part_info;
1498
1499 status.cur_ofs += status.part_size;
1500
1501 call(new RGWAWSStreamObjToCloudMultipartPartCR(sc,
1502 source_conn, src_obj,
1503 target,
1504 dest_obj,
1505 status.src_properties,
1506 status.upload_id,
1507 cur_part_info,
1508 &cur_part_info.etag));
1509 }
1510
1511 if (retcode < 0) {
1512 ldpp_dout(dpp, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
1513 ret_err = retcode;
1514 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
1515 return set_cr_error(ret_err);
1516 }
1517
1518 yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status));
1519 if (retcode < 0) {
1520 ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
1521 /* continue with upload anyway */
1522 }
1523 ldpp_dout(dpp, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
1524 }
1525
1526 yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts));
1527 if (retcode < 0) {
1528 ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
1529 ret_err = retcode;
1530 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
1531 return set_cr_error(ret_err);
1532 }
1533
1534 /* remove status obj */
1535 yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
1536 if (retcode < 0) {
1537 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
1538 /* ignore error, best effort */
1539 }
1540 return set_cr_done();
1541 }
1542
1543 return 0;
1544 }
1545 };
1546 template <class T>
1547 int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
1548 {
1549 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1550 if (iter == attrs.end()) {
1551 *result = def_val;
1552 return 0;
1553 }
1554 bufferlist& bl = iter->second;
1555 if (bl.length() == 0) {
1556 *result = def_val;
1557 return 0;
1558 }
1559 auto bliter = bl.cbegin();
1560 try {
1561 decode(*result, bliter);
1562 } catch (buffer::error& err) {
1563 return -EIO;
1564 }
1565 return 0;
1566 }
1567
1568 // maybe use Fetch Remote Obj instead?
1569 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
1570 rgw_bucket_sync_pipe sync_pipe;
1571 AWSSyncInstanceEnv& instance;
1572
1573 uint64_t versioned_epoch{0};
1574
1575 RGWRESTConn *source_conn{nullptr};
1576 std::shared_ptr<AWSSyncConfig_Profile> target;
1577 bufferlist res;
1578 unordered_map <string, bool> bucket_created;
1579 string target_bucket_name;
1580 string target_obj_name;
1581 rgw_rest_obj rest_obj;
1582 int ret{0};
1583
1584 uint32_t src_zone_short_id{0};
1585 uint64_t src_pg_ver{0};
1586
1587 bufferlist out_bl;
1588
1589 struct CreateBucketResult {
1590 string code;
1591
1592 void decode_xml(XMLObj *obj) {
1593 RGWXMLDecoder::decode_xml("Code", code, obj);
1594 }
1595 } result;
1596
1597 public:
1598 RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1599 rgw_bucket_sync_pipe& _sync_pipe,
1600 rgw_obj_key& _key,
1601 AWSSyncInstanceEnv& _instance,
1602 uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1603 sync_pipe(_sync_pipe),
1604 instance(_instance), versioned_epoch(_versioned_epoch)
1605 {}
1606
1607 ~RGWAWSHandleRemoteObjCBCR(){
1608 }
1609
1610 int operate(const DoutPrefixProvider *dpp) override {
1611 reenter(this) {
1612 ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
1613 if (ret < 0) {
1614 ldpp_dout(dpp, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
1615 } else {
1616 ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
1617 if (ret < 0) {
1618 ldpp_dout(dpp, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
1619 src_pg_ver = 0; /* all or nothing */
1620 }
1621 }
1622 ldpp_dout(dpp, 4) << "AWS: download begin: z=" << sc->source_zone
1623 << " b=" << src_bucket << " k=" << key << " size=" << size
1624 << " mtime=" << mtime << " etag=" << etag
1625 << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
1626 << dendl;
1627
1628 source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
1629 if (!source_conn) {
1630 ldpp_dout(dpp, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl;
1631 return set_cr_error(-EINVAL);
1632 }
1633
1634 instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
1635 instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
1636
1637 if (bucket_created.find(target_bucket_name) == bucket_created.end()){
1638 yield {
1639 ldpp_dout(dpp, 0) << "AWS: creating bucket " << target_bucket_name << dendl;
1640 bufferlist bl;
1641 call(new RGWPutRawRESTResourceCR <bufferlist> (sc->cct, target->conn.get(),
1642 sync_env->http_manager,
1643 target_bucket_name, nullptr, bl, &out_bl));
1644 }
1645 if (retcode < 0 ) {
1646 RGWXMLDecoder::XMLParser parser;
1647 if (!parser.init()) {
1648 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1649 return set_cr_error(retcode);
1650 }
1651
1652 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1653 string str(out_bl.c_str(), out_bl.length());
1654 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
1655 return set_cr_error(retcode);
1656 }
1657
1658 try {
1659 RGWXMLDecoder::decode_xml("Error", result, &parser, true);
1660 } catch (RGWXMLDecoder::err& err) {
1661 string str(out_bl.c_str(), out_bl.length());
1662 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1663 return set_cr_error(retcode);
1664 }
1665
1666 if (result.code != "BucketAlreadyOwnedByYou") {
1667 return set_cr_error(retcode);
1668 }
1669 }
1670
1671 bucket_created[target_bucket_name] = true;
1672 }
1673
1674 yield {
1675 rgw::sal::RadosBucket bucket(sync_env->store, src_bucket);
1676 rgw::sal::RadosObject src_obj(sync_env->store, key, &bucket);
1677
1678 /* init output */
1679 rgw_bucket target_bucket;
1680 target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
1681 uri resolution */
1682 rgw::sal::RadosBucket dest_bucket(sync_env->store, target_bucket);
1683 rgw::sal::RadosObject dest_obj(sync_env->store, rgw_obj_key(target_obj_name), &dest_bucket);
1684
1685
1686 rgw_sync_aws_src_obj_properties src_properties;
1687 src_properties.mtime = mtime;
1688 src_properties.etag = etag;
1689 src_properties.zone_short_id = src_zone_short_id;
1690 src_properties.pg_ver = src_pg_ver;
1691 src_properties.versioned_epoch = versioned_epoch;
1692
1693 if (size < instance.conf.s3.multipart_sync_threshold) {
1694 call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, &src_obj,
1695 src_properties,
1696 target,
1697 &dest_obj));
1698 } else {
1699 rgw_rest_obj rest_obj;
1700 rest_obj.init(key);
1701 if (do_decode_rest_obj(dpp, sc->cct, attrs, headers, &rest_obj)) {
1702 ldpp_dout(dpp, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
1703 return set_cr_error(-EINVAL);
1704 }
1705 call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, &src_obj,
1706 target, &dest_obj, size, src_properties, rest_obj));
1707 }
1708 }
1709 if (retcode < 0) {
1710 return set_cr_error(retcode);
1711 }
1712
1713 return set_cr_done();
1714 }
1715
1716 return 0;
1717 }
1718 };
1719
1720 class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
1721 rgw_bucket_sync_pipe sync_pipe;
1722 AWSSyncInstanceEnv& instance;
1723 uint64_t versioned_epoch;
1724 public:
1725 RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1726 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1727 AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1728 sync_pipe(_sync_pipe),
1729 instance(_instance), versioned_epoch(_versioned_epoch) {
1730 }
1731
1732 ~RGWAWSHandleRemoteObjCR() {}
1733
1734 RGWStatRemoteObjCBCR *allocate_callback() override {
1735 return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch);
1736 }
1737 };
1738
1739 class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
1740 RGWDataSyncCtx *sc;
1741 std::shared_ptr<AWSSyncConfig_Profile> target;
1742 rgw_bucket_sync_pipe sync_pipe;
1743 rgw_obj_key key;
1744 ceph::real_time mtime;
1745 AWSSyncInstanceEnv& instance;
1746 int ret{0};
1747 public:
1748 RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
1749 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1750 AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc),
1751 sync_pipe(_sync_pipe), key(_key),
1752 mtime(_mtime), instance(_instance) {}
1753 int operate(const DoutPrefixProvider *dpp) override {
1754 reenter(this) {
1755 ldpp_dout(dpp, 0) << ": remove remote obj: z=" << sc->source_zone
1756 << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
1757 yield {
1758 instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
1759 string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
1760 ldpp_dout(dpp, 0) << "AWS: removing aws object at" << path << dendl;
1761
1762 call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(),
1763 sc->env->http_manager,
1764 path, nullptr /* params */));
1765 }
1766 if (retcode < 0) {
1767 return set_cr_error(retcode);
1768 }
1769 return set_cr_done();
1770 }
1771 return 0;
1772 }
1773
1774 };
1775
1776
1777 class RGWAWSDataSyncModule: public RGWDataSyncModule {
1778 CephContext *cct;
1779 AWSSyncInstanceEnv instance;
1780 public:
1781 RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) :
1782 cct(_cct),
1783 instance(_conf) {
1784 }
1785
1786 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1787 instance.init(sc, instance_id);
1788 }
1789
1790 ~RGWAWSDataSyncModule() {}
1791
1792 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
1793 std::optional<uint64_t> versioned_epoch,
1794 rgw_zone_set *zones_trace) override {
1795 ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1796 return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
1797 }
1798 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
1799 rgw_zone_set *zones_trace) override {
1800 ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1801 return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
1802 }
1803 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
1804 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
1805 rgw_zone_set *zones_trace) override {
1806 ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
1807 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1808 return NULL;
1809 }
1810 };
1811
1812 class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance {
1813 RGWAWSDataSyncModule data_handler;
1814 public:
1815 RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
1816 RGWDataSyncModule *get_data_handler() override {
1817 return &data_handler;
1818 }
1819 };
1820
1821 int RGWAWSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
1822 AWSSyncConfig conf;
1823
1824 int r = conf.init(dpp, cct, config);
1825 if (r < 0) {
1826 return r;
1827 }
1828
1829 instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
1830 return 0;
1831 }