]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_sync_module_aws.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_sync_module_aws.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/errno.h"
5
6 #include "rgw_common.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_sync_module.h"
9 #include "rgw_data_sync.h"
10 #include "rgw_sync_module_aws.h"
11 #include "rgw_cr_rados.h"
12 #include "rgw_rest_conn.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_acl.h"
15 #include "rgw_zone.h"
16
17 #include "services/svc_zone.h"
18
19 #include <boost/asio/yield.hpp>
20
21 #define dout_subsys ceph_subsys_rgw
22
23
24 #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
25
26 using namespace std;
27
28 static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}";
29
30 static string get_key_oid(const rgw_obj_key& key)
31 {
32 string oid = key.name;
33 if (!key.instance.empty() &&
34 !key.have_null_instance()) {
35 oid += string(":") + key.instance;
36 }
37 return oid;
38 }
39
40 static string obj_to_aws_path(const rgw_obj& obj)
41 {
42 return obj.bucket.name + "/" + get_key_oid(obj.key);
43 }
44
45 /*
46
47 json configuration definition:
48
49 {
50 "connection": {
51 "access_key": <access>,
52 "secret": <secret>,
53 "endpoint": <endpoint>,
54 "host_style": <path | virtual>,
55 },
56 "acls": [ { "type": <id | email | uri>,
57 "source_id": <source_id>,
58 "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
59 "target_path": <target_path>, # override default
60
61
62 # anything below here is for non trivial configuration
63 # can be used in conjuction with the above
64
65 "default": {
66 "connection": {
67 "access_key": <access>,
68 "secret": <secret>,
69 "endpoint": <endpoint>,
70 "host_style" <path | virtual>,
71 },
72 "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
73 {
74 "type" : <id | email | uri>, # optional, default is id
75 "source_id": <id>,
76 "dest_id": <id>
77 } ... ]
78 "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
79 # final object name will be target_path + "/" + obj
80 },
81 "connections": [
82 {
83 "id": <id>,
84 "access_key": <access>,
85 "secret": <secret>,
86 "endpoint": <endpoint>,
87 } ... ],
88 "acl_profiles": [
89 {
90 "id": <id>, # acl mappings
91 "acls": [ {
92 "type": <id | email | uri>,
93 "source_id": <id>,
94 "dest_id": <id>
95 } ... ]
96 }
97 ],
98 "profiles": [
99 {
100 "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
101 "target_path": <dest>, # (override default)
102 "connection_id": <connection_id>, # optional, if empty references default connection
103 "acls_id": <mappings_id>, # optional, if empty references default mappings
104 } ... ],
105 }
106
107 target path optional variables:
108
109 (evaluated at init)
110 sid: sync instance id, randomly generated by sync process on first sync initalization
111 zonegroup: zonegroup name
112 zonegroup_id: zonegroup name
113 zone: zone name
114 zone_id: zone name
115
116 (evaluated when syncing)
117 bucket: bucket name
118 owner: bucket owner
119
120 */
121
122 struct ACLMapping {
123 ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
124 string source_id;
125 string dest_id;
126
127 ACLMapping() = default;
128
129 ACLMapping(ACLGranteeTypeEnum t,
130 const string& s,
131 const string& d) : type(t),
132 source_id(s),
133 dest_id(d) {}
134
135 void init(const JSONFormattable& config) {
136 const string& t = config["type"];
137
138 if (t == "email") {
139 type = ACL_TYPE_EMAIL_USER;
140 } else if (t == "uri") {
141 type = ACL_TYPE_GROUP;
142 } else {
143 type = ACL_TYPE_CANON_USER;
144 }
145
146 source_id = config["source_id"];
147 dest_id = config["dest_id"];
148 }
149
150 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
151 Formatter::ObjectSection os(jf, "acl_mapping");
152 string s;
153 switch (type) {
154 case ACL_TYPE_EMAIL_USER:
155 s = "email";
156 break;
157 case ACL_TYPE_GROUP:
158 s = "uri";
159 break;
160 default:
161 s = "id";
162 break;
163 }
164 encode_json("type", s, &jf);
165 encode_json("source_id", source_id, &jf);
166 encode_json("dest_id", dest_id, &jf);
167 }
168 };
169
170 struct ACLMappings {
171 map<string, ACLMapping> acl_mappings;
172
173 void init(const JSONFormattable& config) {
174 for (auto& c : config.array()) {
175 ACLMapping m;
176 m.init(c);
177
178 acl_mappings.emplace(std::make_pair(m.source_id, m));
179 }
180 }
181 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
182 Formatter::ArraySection os(jf, "acls");
183
184 for (auto& i : acl_mappings) {
185 i.second.dump_conf(cct, jf);
186 }
187 }
188 };
189
190 struct AWSSyncConfig_ACLProfiles {
191 map<string, std::shared_ptr<ACLMappings> > acl_profiles;
192
193 void init(const JSONFormattable& config) {
194 for (auto& c : config.array()) {
195 const string& profile_id = c["id"];
196
197 std::shared_ptr<ACLMappings> ap{new ACLMappings};
198 ap->init(c["acls"]);
199
200 acl_profiles[profile_id] = ap;
201 }
202 }
203 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
204 Formatter::ArraySection section(jf, "acl_profiles");
205
206 for (auto& p : acl_profiles) {
207 Formatter::ObjectSection section(jf, "profile");
208 encode_json("id", p.first, &jf);
209 p.second->dump_conf(cct, jf);
210 }
211 }
212
213 bool find(const string& profile_id, ACLMappings *result) const {
214 auto iter = acl_profiles.find(profile_id);
215 if (iter == acl_profiles.end()) {
216 return false;
217 }
218 *result = *iter->second;
219 return true;
220 }
221 };
222
223 struct AWSSyncConfig_Connection {
224 string connection_id;
225 string endpoint;
226 RGWAccessKey key;
227 std::optional<string> region;
228 HostStyle host_style{PathStyle};
229
230 bool has_endpoint{false};
231 bool has_key{false};
232 bool has_host_style{false};
233
234 void init(const JSONFormattable& config) {
235 has_endpoint = config.exists("endpoint");
236 has_key = config.exists("access_key") || config.exists("secret");
237 has_host_style = config.exists("host_style");
238
239 connection_id = config["id"];
240 endpoint = config["endpoint"];
241
242 key = RGWAccessKey(config["access_key"], config["secret"]);
243
244 if (config.exists("region")) {
245 region = config["region"];
246 } else {
247 region.reset();
248 }
249
250 string host_style_str = config["host_style"];
251 if (host_style_str != "virtual") {
252 host_style = PathStyle;
253 } else {
254 host_style = VirtualStyle;
255 }
256 }
257 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
258 Formatter::ObjectSection section(jf, "connection");
259 encode_json("id", connection_id, &jf);
260 encode_json("endpoint", endpoint, &jf);
261 string s = (host_style == PathStyle ? "path" : "virtual");
262 encode_json("region", region, &jf);
263 encode_json("host_style", s, &jf);
264
265 {
266 Formatter::ObjectSection os(jf, "key");
267 encode_json("access_key", key.id, &jf);
268 string secret = (key.key.empty() ? "" : "******");
269 encode_json("secret", secret, &jf);
270 }
271 }
272 };
273
274 static int conf_to_uint64(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
275 {
276 string sval;
277 if (config.find(key, &sval)) {
278 string err;
279 uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
280 if (!err.empty()) {
281 ldpp_dout(dpp, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
282 return -EINVAL;
283 }
284 *pval = val;
285 }
286 return 0;
287 }
288
289 struct AWSSyncConfig_S3 {
290 uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
291 uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
292
293 int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
294 int r = conf_to_uint64(dpp, cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
295 if (r < 0) {
296 return r;
297 }
298
299 r = conf_to_uint64(dpp, cct, config, "multipart_min_part_size", &multipart_min_part_size);
300 if (r < 0) {
301 return r;
302 }
303 #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
304 if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
305 multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
306 }
307 return 0;
308 }
309
310 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
311 Formatter::ObjectSection section(jf, "s3");
312 encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf);
313 encode_json("multipart_min_part_size", multipart_min_part_size, &jf);
314 }
315 };
316
317 struct AWSSyncConfig_Profile {
318 string source_bucket;
319 bool prefix{false};
320 string target_path;
321 string connection_id;
322 string acls_id;
323
324 std::shared_ptr<AWSSyncConfig_Connection> conn_conf;
325 std::shared_ptr<ACLMappings> acls;
326
327 std::shared_ptr<RGWRESTConn> conn;
328
329 void init(const JSONFormattable& config) {
330 source_bucket = config["source_bucket"];
331
332 prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*');
333
334 if (prefix) {
335 source_bucket = source_bucket.substr(0, source_bucket.size() - 1);
336 }
337
338 target_path = config["target_path"];
339 connection_id = config["connection_id"];
340 acls_id = config["acls_id"];
341
342 if (config.exists("connection")) {
343 conn_conf = make_shared<AWSSyncConfig_Connection>();
344 conn_conf->init(config["connection"]);
345 }
346
347 if (config.exists("acls")) {
348 acls = make_shared<ACLMappings>();
349 acls->init(config["acls"]);
350 }
351 }
352
353 void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const {
354 Formatter::ObjectSection config(jf, section);
355 string sb{source_bucket};
356 if (prefix) {
357 sb.append("*");
358 }
359 encode_json("source_bucket", sb, &jf);
360 encode_json("target_path", target_path, &jf);
361 encode_json("connection_id", connection_id, &jf);
362 encode_json("acls_id", acls_id, &jf);
363 if (conn_conf.get()) {
364 conn_conf->dump_conf(cct, jf);
365 }
366 if (acls.get()) {
367 acls->dump_conf(cct, jf);
368 }
369 }
370 };
371
372 static void find_and_replace(const string& src, const string& find, const string& replace, string *dest)
373 {
374 string s = src;
375
376 size_t pos = s.find(find);
377 while (pos != string::npos) {
378 size_t next_ofs = pos + find.size();
379 s = s.substr(0, pos) + replace + s.substr(next_ofs);
380 pos = s.find(find, next_ofs);
381 }
382
383 *dest = s;
384 }
385
386 static void apply_meta_param(const string& src, const string& param, const string& val, string *dest)
387 {
388 string s = string("${") + param + "}";
389 find_and_replace(src, s, val, dest);
390 }
391
392
393 struct AWSSyncConfig {
394 AWSSyncConfig_Profile default_profile;
395 std::shared_ptr<AWSSyncConfig_Profile> root_profile;
396
397 map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections;
398 AWSSyncConfig_ACLProfiles acl_profiles;
399
400 map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles;
401
402 AWSSyncConfig_S3 s3;
403
404 int init_profile(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
405 bool connection_must_exist) {
406 if (!profile.connection_id.empty()) {
407 if (profile.conn_conf) {
408 ldpp_dout(dpp, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
409 return -EINVAL;
410 }
411 if (connections.find(profile.connection_id) == connections.end()) {
412 ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
413 return -EINVAL;
414 }
415 profile.conn_conf = connections[profile.connection_id];
416 } else if (!profile.conn_conf) {
417 profile.connection_id = default_profile.connection_id;
418 auto i = connections.find(profile.connection_id);
419 if (i != connections.end()) {
420 profile.conn_conf = i->second;
421 }
422 }
423
424 if (connection_must_exist && !profile.conn_conf) {
425 ldpp_dout(dpp, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
426 return -EINVAL;
427 }
428
429 if (profile.conn_conf && default_profile.conn_conf) {
430 if (!profile.conn_conf->has_endpoint) {
431 profile.conn_conf->endpoint = default_profile.conn_conf->endpoint;
432 }
433 if (!profile.conn_conf->has_host_style) {
434 profile.conn_conf->host_style = default_profile.conn_conf->host_style;
435 }
436 if (!profile.conn_conf->has_key) {
437 profile.conn_conf->key = default_profile.conn_conf->key;
438 }
439 }
440
441 ACLMappings acl_mappings;
442
443 if (!profile.acls_id.empty()) {
444 if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
445 ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
446 return -EINVAL;
447 }
448 profile.acls = acl_profiles.acl_profiles[profile.acls_id];
449 } else if (!profile.acls) {
450 if (default_profile.acls) {
451 profile.acls = default_profile.acls;
452 profile.acls_id = default_profile.acls_id;
453 }
454 }
455
456 if (profile.target_path.empty()) {
457 profile.target_path = default_profile.target_path;
458 }
459 if (profile.target_path.empty()) {
460 profile.target_path = default_target_path;
461 }
462
463 return 0;
464 }
465
466 int init_target(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
467 std::shared_ptr<AWSSyncConfig_Profile> profile;
468 profile.reset(new AWSSyncConfig_Profile);
469 profile->init(profile_conf);
470
471 int ret = init_profile(dpp, cct, profile_conf, *profile, true);
472 if (ret < 0) {
473 return ret;
474 }
475
476 auto& sb = profile->source_bucket;
477
478 if (explicit_profiles.find(sb) != explicit_profiles.end()) {
479 ldpp_dout(dpp, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
480 }
481
482 explicit_profiles[sb] = profile;
483 if (ptarget) {
484 *ptarget = profile;
485 }
486 return 0;
487 }
488
489 bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
490 const string& name = bucket.name;
491 auto iter = explicit_profiles.upper_bound(name);
492 if (iter == explicit_profiles.begin()) {
493 return false;
494 }
495
496 --iter;
497 if (iter->first.size() > name.size()) {
498 return false;
499 }
500 if (name.compare(0, iter->first.size(), iter->first) != 0) {
501 return false;
502 }
503
504 std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second;
505
506 if (!target->prefix &&
507 name.size() != iter->first.size()) {
508 return false;
509 }
510
511 *result = target;
512 return true;
513 }
514
515 void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
516 if (!do_find_profile(bucket, result)) {
517 *result = root_profile;
518 }
519 }
520
521 AWSSyncConfig() {}
522
523 int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
524 auto& default_conf = config["default"];
525
526 if (config.exists("default")) {
527 default_profile.init(default_conf);
528 init_profile(dpp, cct, default_conf, default_profile, false);
529 }
530
531 for (auto& conn : config["connections"].array()) {
532 auto new_conn = conn;
533
534 std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection};
535 c->init(new_conn);
536
537 connections[new_conn["id"]] = c;
538 }
539
540 acl_profiles.init(config["acl_profiles"]);
541
542 int r = s3.init(dpp, cct, config["s3"]);
543 if (r < 0) {
544 return r;
545 }
546
547 auto new_root_conf = config;
548
549 r = init_target(dpp, cct, new_root_conf, &root_profile); /* the root profile config */
550 if (r < 0) {
551 return r;
552 }
553
554 for (auto target_conf : config["profiles"].array()) {
555 int r = init_target(dpp, cct, target_conf, nullptr);
556 if (r < 0) {
557 return r;
558 }
559 }
560
561 JSONFormatter jf(true);
562 dump_conf(cct, jf);
563 stringstream ss;
564 jf.flush(ss);
565
566 ldpp_dout(dpp, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;
567
568 return 0;
569 }
570
571 void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) {
572 apply_meta_param(path, "sid", sid, dest);
573
574 const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup();
575 apply_meta_param(path, "zonegroup", zg.get_name(), dest);
576 apply_meta_param(path, "zonegroup_id", zg.get_id(), dest);
577
578 const RGWZone& zone = sc->env->svc->zone->get_zone();
579 apply_meta_param(path, "zone", zone.name, dest);
580 apply_meta_param(path, "zone_id", zone.id, dest);
581 }
582
583 void update_config(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, const string& sid) {
584 expand_target(sc, sid, root_profile->target_path, &root_profile->target_path);
585 ldpp_dout(dpp, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
586 for (auto& t : explicit_profiles) {
587 expand_target(sc, sid, t.second->target_path, &t.second->target_path);
588 ldpp_dout(dpp, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
589 }
590 }
591
592 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
593 Formatter::ObjectSection config(jf, "config");
594 root_profile->dump_conf(cct, jf);
595 jf.open_array_section("connections");
596 for (auto c : connections) {
597 c.second->dump_conf(cct, jf);
598 }
599 jf.close_section();
600
601 acl_profiles.dump_conf(cct, jf);
602
603 { // targets
604 Formatter::ArraySection as(jf, "profiles");
605 for (auto& t : explicit_profiles) {
606 Formatter::ObjectSection target_section(jf, "profile");
607 encode_json("name", t.first, &jf);
608 t.second->dump_conf(cct, jf);
609 }
610 }
611 }
612
613 string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile,
614 const RGWBucketInfo& bucket_info,
615 const rgw_obj_key& obj) {
616 string bucket_str;
617 string owner;
618 if (!bucket_info.owner.tenant.empty()) {
619 bucket_str = owner = bucket_info.owner.tenant + "-";
620 owner += bucket_info.owner.id;
621 }
622 bucket_str += bucket_info.bucket.name;
623
624 const string& path = profile->target_path;
625
626 string new_path;
627 apply_meta_param(path, "bucket", bucket_str, &new_path);
628 apply_meta_param(new_path, "owner", owner, &new_path);
629
630 new_path += string("/") + get_key_oid(obj);
631
632 return new_path;
633 }
634
635 void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile,
636 const RGWBucketInfo& bucket_info,
637 const rgw_obj_key& obj,
638 string *bucket_name,
639 string *obj_name) {
640 string path = get_path(profile, bucket_info, obj);
641 size_t pos = path.find('/');
642
643 *bucket_name = path.substr(0, pos);
644 *obj_name = path.substr(pos + 1);
645 }
646
647 void init_conns(RGWDataSyncCtx *sc, const string& id) {
648 auto sync_env = sc->env;
649
650 update_config(sync_env->dpp, sc, id);
651
652 auto& root_conf = root_profile->conn_conf;
653
654 root_profile->conn.reset(new S3RESTConn(sc->cct,
655 id,
656 { root_conf->endpoint },
657 root_conf->key,
658 sync_env->svc->zone->get_zonegroup().get_id(),
659 root_conf->region,
660 root_conf->host_style));
661
662 for (auto i : explicit_profiles) {
663 auto& c = i.second;
664
665 c->conn.reset(new S3RESTConn(sc->cct,
666 id,
667 { c->conn_conf->endpoint },
668 c->conn_conf->key,
669 sync_env->svc->zone->get_zonegroup().get_id(),
670 c->conn_conf->region,
671 c->conn_conf->host_style));
672 }
673 }
674 };
675
676
677 struct AWSSyncInstanceEnv {
678 AWSSyncConfig conf;
679 string id;
680
681 explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {}
682
683 void init(RGWDataSyncCtx *sc, uint64_t instance_id) {
684 char buf[32];
685 snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
686 id = buf;
687
688 conf.init_conns(sc, id);
689 }
690
691 void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
692 conf.find_profile(bucket, ptarget);
693 ceph_assert(ptarget);
694 }
695 };
696
697 static int do_decode_rest_obj(const DoutPrefixProvider *dpp, CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
698 {
699 for (auto header : headers) {
700 const string& val = header.second;
701 if (header.first == "RGWX_OBJECT_SIZE") {
702 info->content_len = atoi(val.c_str());
703 } else {
704 info->attrs[header.first] = val;
705 }
706 }
707
708 info->acls.set_ctx(cct);
709 auto aiter = attrs.find(RGW_ATTR_ACL);
710 if (aiter != attrs.end()) {
711 bufferlist& bl = aiter->second;
712 auto bliter = bl.cbegin();
713 try {
714 info->acls.decode(bliter);
715 } catch (buffer::error& err) {
716 ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
717 return -EIO;
718 }
719 } else {
720 ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
721 }
722
723 return 0;
724 }
725
726 class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
727 {
728 RGWDataSyncCtx *sc;
729 RGWRESTConn *conn;
730 const rgw_obj& src_obj;
731 RGWRESTConn::get_obj_params req_params;
732
733 rgw_sync_aws_src_obj_properties src_properties;
734 public:
735 RGWRESTStreamGetCRF(CephContext *_cct,
736 RGWCoroutinesEnv *_env,
737 RGWCoroutine *_caller,
738 RGWDataSyncCtx *_sc,
739 RGWRESTConn *_conn,
740 const rgw_obj& _src_obj,
741 const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller,
742 _sc->env->http_manager, _src_obj.key),
743 sc(_sc), conn(_conn), src_obj(_src_obj),
744 src_properties(_src_properties) {
745 }
746
747 int init(const DoutPrefixProvider *dpp) override {
748 /* init input connection */
749
750
751 req_params.get_op = true;
752 req_params.prepend_metadata = true;
753
754 req_params.unmod_ptr = &src_properties.mtime;
755 req_params.etag = src_properties.etag;
756 req_params.mod_zone_id = src_properties.zone_short_id;
757 req_params.mod_pg_ver = src_properties.pg_ver;
758
759 if (range.is_set) {
760 req_params.range_is_set = true;
761 req_params.range_start = range.ofs;
762 req_params.range_end = range.ofs + range.size - 1;
763 }
764
765 RGWRESTStreamRWRequest *in_req;
766 int ret = conn->get_obj(dpp, src_obj, req_params, false /* send */, &in_req);
767 if (ret < 0) {
768 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
769 return ret;
770 }
771
772 set_req(in_req);
773
774 return RGWStreamReadHTTPResourceCRF::init(dpp);
775 }
776
777 int decode_rest_obj(const DoutPrefixProvider *dpp, map<string, string>& headers, bufferlist& extra_data) override {
778 map<string, bufferlist> src_attrs;
779
780 ldpp_dout(dpp, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
781
782 if (extra_data.length() > 0) {
783 JSONParser jp;
784 if (!jp.parse(extra_data.c_str(), extra_data.length())) {
785 ldpp_dout(dpp, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
786 return -EIO;
787 }
788
789 JSONDecoder::decode_json("attrs", src_attrs, &jp);
790 }
791 return do_decode_rest_obj(dpp, sc->cct, src_attrs, headers, &rest_obj);
792 }
793
794 bool need_extra_data() override {
795 return true;
796 }
797 };
798
799 static std::set<string> keep_headers = { "CONTENT_TYPE",
800 "CONTENT_ENCODING",
801 "CONTENT_DISPOSITION",
802 "CONTENT_LANGUAGE" };
803
804 class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
805 {
806 RGWDataSyncCtx *sc;
807 rgw_sync_aws_src_obj_properties src_properties;
808 std::shared_ptr<AWSSyncConfig_Profile> target;
809 const rgw_obj& dest_obj;
810 string etag;
811 public:
812 RGWAWSStreamPutCRF(CephContext *_cct,
813 RGWCoroutinesEnv *_env,
814 RGWCoroutine *_caller,
815 RGWDataSyncCtx *_sc,
816 const rgw_sync_aws_src_obj_properties& _src_properties,
817 std::shared_ptr<AWSSyncConfig_Profile>& _target,
818 const rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager),
819 sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) {
820 }
821
822 int init() override {
823 /* init output connection */
824 RGWRESTStreamS3PutObj *out_req{nullptr};
825
826 if (multipart.is_multipart) {
827 char buf[32];
828 snprintf(buf, sizeof(buf), "%d", multipart.part_num);
829 rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
830 { "partNumber", buf },
831 { nullptr, nullptr } };
832 target->conn->put_obj_send_init(dest_obj, params, &out_req);
833 } else {
834 target->conn->put_obj_send_init(dest_obj, nullptr, &out_req);
835 }
836
837 set_req(out_req);
838
839 return RGWStreamWriteHTTPResourceCRF::init();
840 }
841
842 static bool keep_attr(const string& h) {
843 return (keep_headers.find(h) != keep_headers.end() ||
844 boost::algorithm::starts_with(h, "X_AMZ_"));
845 }
846
847 static void init_send_attrs(const DoutPrefixProvider *dpp,
848 CephContext *cct,
849 const rgw_rest_obj& rest_obj,
850 const rgw_sync_aws_src_obj_properties& src_properties,
851 const AWSSyncConfig_Profile *target,
852 map<string, string> *attrs) {
853 auto& new_attrs = *attrs;
854
855 new_attrs.clear();
856
857 for (auto& hi : rest_obj.attrs) {
858 if (keep_attr(hi.first)) {
859 new_attrs.insert(hi);
860 }
861 }
862
863 auto acl = rest_obj.acls.get_acl();
864
865 map<int, vector<string> > access_map;
866
867 if (target->acls) {
868 for (auto& grant : acl.get_grant_map()) {
869 auto& orig_grantee = grant.first;
870 auto& perm = grant.second;
871
872 string grantee;
873
874 const auto& am = target->acls->acl_mappings;
875
876 auto iter = am.find(orig_grantee);
877 if (iter == am.end()) {
878 ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
879 continue;
880 }
881
882 grantee = iter->second.dest_id;
883
884 string type;
885
886 switch (iter->second.type) {
887 case ACL_TYPE_CANON_USER:
888 type = "id";
889 break;
890 case ACL_TYPE_EMAIL_USER:
891 type = "emailAddress";
892 break;
893 case ACL_TYPE_GROUP:
894 type = "uri";
895 break;
896 default:
897 continue;
898 }
899
900 string tv = type + "=" + grantee;
901
902 int flags = perm.get_permission().get_permissions();
903 if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
904 access_map[flags].push_back(tv);
905 continue;
906 }
907
908 for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
909 if (flags & i) {
910 access_map[i].push_back(tv);
911 }
912 }
913 }
914 }
915
916 for (auto aiter : access_map) {
917 int grant_type = aiter.first;
918
919 string header_str("x-amz-grant-");
920
921 switch (grant_type) {
922 case RGW_PERM_READ:
923 header_str.append("read");
924 break;
925 case RGW_PERM_WRITE:
926 header_str.append("write");
927 break;
928 case RGW_PERM_READ_ACP:
929 header_str.append("read-acp");
930 break;
931 case RGW_PERM_WRITE_ACP:
932 header_str.append("write-acp");
933 break;
934 case RGW_PERM_FULL_CONTROL:
935 header_str.append("full-control");
936 break;
937 }
938
939 string s;
940
941 for (auto viter : aiter.second) {
942 if (!s.empty()) {
943 s.append(", ");
944 }
945 s.append(viter);
946 }
947
948 ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
949
950 new_attrs[header_str] = s;
951 }
952
953 char buf[32];
954 snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch);
955 new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
956
957 utime_t ut(src_properties.mtime);
958 snprintf(buf, sizeof(buf), "%lld.%09lld",
959 (long long)ut.sec(),
960 (long long)ut.nsec());
961
962 new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
963 new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag;
964 new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
965 if (!rest_obj.key.instance.empty()) {
966 new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
967 }
968 }
969
970 void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
971 RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
972
973 map<string, string> new_attrs;
974 if (!multipart.is_multipart) {
975 init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
976 }
977
978 r->set_send_length(rest_obj.content_len);
979
980 RGWAccessControlPolicy policy;
981
982 r->send_ready(dpp, target->conn->get_key(), new_attrs, policy);
983 }
984
985 void handle_headers(const map<string, string>& headers) {
986 for (auto h : headers) {
987 if (h.first == "ETAG") {
988 etag = h.second;
989 }
990 }
991 }
992
993 bool get_etag(string *petag) {
994 if (etag.empty()) {
995 return false;
996 }
997 *petag = etag;
998 return true;
999 }
1000 };
1001
1002
1003 class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
1004 RGWDataSyncCtx *sc;
1005 RGWRESTConn *source_conn;
1006 std::shared_ptr<AWSSyncConfig_Profile> target;
1007 const rgw_obj& src_obj;
1008 const rgw_obj& dest_obj;
1009
1010 rgw_sync_aws_src_obj_properties src_properties;
1011
1012 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
1013 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
1014
1015 public:
1016 RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc,
1017 RGWRESTConn *_source_conn,
1018 const rgw_obj& _src_obj,
1019 const rgw_sync_aws_src_obj_properties& _src_properties,
1020 std::shared_ptr<AWSSyncConfig_Profile> _target,
1021 const rgw_obj& _dest_obj) : RGWCoroutine(_sc->cct),
1022 sc(_sc),
1023 source_conn(_source_conn),
1024 target(_target),
1025 src_obj(_src_obj),
1026 dest_obj(_dest_obj),
1027 src_properties(_src_properties) {}
1028
1029 int operate(const DoutPrefixProvider *dpp) override {
1030 reenter(this) {
1031 /* init input */
1032 in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
1033 source_conn, src_obj,
1034 src_properties));
1035
1036 /* init output */
1037 out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
1038 src_properties, target, dest_obj));
1039
1040 yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
1041 if (retcode < 0) {
1042 return set_cr_error(retcode);
1043 }
1044
1045 return set_cr_done();
1046 }
1047
1048 return 0;
1049 }
1050 };
1051
1052 class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
1053 RGWDataSyncCtx *sc;
1054 RGWRESTConn *source_conn;
1055 std::shared_ptr<AWSSyncConfig_Profile> target;
1056 const rgw_obj& src_obj;
1057 const rgw_obj& dest_obj;
1058
1059 rgw_sync_aws_src_obj_properties src_properties;
1060
1061 string upload_id;
1062
1063 rgw_sync_aws_multipart_part_info part_info;
1064
1065 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
1066 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
1067
1068 string *petag;
1069
1070 public:
1071 RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc,
1072 RGWRESTConn *_source_conn,
1073 const rgw_obj& _src_obj,
1074 std::shared_ptr<AWSSyncConfig_Profile>& _target,
1075 const rgw_obj& _dest_obj,
1076 const rgw_sync_aws_src_obj_properties& _src_properties,
1077 const string& _upload_id,
1078 const rgw_sync_aws_multipart_part_info& _part_info,
1079 string *_petag) : RGWCoroutine(_sc->cct),
1080 sc(_sc),
1081 source_conn(_source_conn),
1082 target(_target),
1083 src_obj(_src_obj),
1084 dest_obj(_dest_obj),
1085 src_properties(_src_properties),
1086 upload_id(_upload_id),
1087 part_info(_part_info),
1088 petag(_petag) {}
1089
1090 int operate(const DoutPrefixProvider *dpp) override {
1091 reenter(this) {
1092 /* init input */
1093 in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
1094 source_conn, src_obj,
1095 src_properties));
1096
1097 in_crf->set_range(part_info.ofs, part_info.size);
1098
1099 /* init output */
1100 out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
1101 src_properties, target, dest_obj));
1102
1103 out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
1104
1105 yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
1106 if (retcode < 0) {
1107 return set_cr_error(retcode);
1108 }
1109
1110 if (!(static_cast<RGWAWSStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
1111 ldpp_dout(dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl;
1112 return set_cr_error(-EIO);
1113 }
1114
1115 return set_cr_done();
1116 }
1117
1118 return 0;
1119 }
1120 };
1121
1122 class RGWAWSAbortMultipartCR : public RGWCoroutine {
1123 RGWDataSyncCtx *sc;
1124 RGWRESTConn *dest_conn;
1125 const rgw_obj& dest_obj;
1126
1127 string upload_id;
1128
1129 public:
1130 RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc,
1131 RGWRESTConn *_dest_conn,
1132 const rgw_obj& _dest_obj,
1133 const string& _upload_id) : RGWCoroutine(_sc->cct),
1134 sc(_sc),
1135 dest_conn(_dest_conn),
1136 dest_obj(_dest_obj),
1137 upload_id(_upload_id) {}
1138
1139 int operate(const DoutPrefixProvider *dpp) override {
1140 reenter(this) {
1141
1142 yield {
1143 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
1144 bufferlist bl;
1145 call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager,
1146 obj_to_aws_path(dest_obj), params));
1147 }
1148
1149 if (retcode < 0) {
1150 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
1151 return set_cr_error(retcode);
1152 }
1153
1154 return set_cr_done();
1155 }
1156
1157 return 0;
1158 }
1159 };
1160
1161 class RGWAWSInitMultipartCR : public RGWCoroutine {
1162 RGWDataSyncCtx *sc;
1163 RGWRESTConn *dest_conn;
1164 const rgw_obj& dest_obj;
1165
1166 uint64_t obj_size;
1167 map<string, string> attrs;
1168
1169 bufferlist out_bl;
1170
1171 string *upload_id;
1172
1173 struct InitMultipartResult {
1174 string bucket;
1175 string key;
1176 string upload_id;
1177
1178 void decode_xml(XMLObj *obj) {
1179 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
1180 RGWXMLDecoder::decode_xml("Key", key, obj);
1181 RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
1182 }
1183 } result;
1184
1185 public:
1186 RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc,
1187 RGWRESTConn *_dest_conn,
1188 const rgw_obj& _dest_obj,
1189 uint64_t _obj_size,
1190 const map<string, string>& _attrs,
1191 string *_upload_id) : RGWCoroutine(_sc->cct),
1192 sc(_sc),
1193 dest_conn(_dest_conn),
1194 dest_obj(_dest_obj),
1195 obj_size(_obj_size),
1196 attrs(_attrs),
1197 upload_id(_upload_id) {}
1198
1199 int operate(const DoutPrefixProvider *dpp) override {
1200 reenter(this) {
1201
1202 yield {
1203 rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
1204 bufferlist bl;
1205 call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
1206 obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
1207 }
1208
1209 if (retcode < 0) {
1210 ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
1211 return set_cr_error(retcode);
1212 }
1213 {
1214 /*
1215 * If one of the following fails we cannot abort upload, as we cannot
1216 * extract the upload id. If one of these fail it's very likely that that's
1217 * the least of our problem.
1218 */
1219 RGWXMLDecoder::XMLParser parser;
1220 if (!parser.init()) {
1221 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1222 return set_cr_error(-EIO);
1223 }
1224
1225 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1226 string str(out_bl.c_str(), out_bl.length());
1227 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
1228 return set_cr_error(-EIO);
1229 }
1230
1231 try {
1232 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
1233 } catch (RGWXMLDecoder::err& err) {
1234 string str(out_bl.c_str(), out_bl.length());
1235 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1236 return set_cr_error(-EIO);
1237 }
1238 }
1239
1240 ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
1241
1242 *upload_id = result.upload_id;
1243
1244 return set_cr_done();
1245 }
1246
1247 return 0;
1248 }
1249 };
1250
1251 class RGWAWSCompleteMultipartCR : public RGWCoroutine {
1252 RGWDataSyncCtx *sc;
1253 RGWRESTConn *dest_conn;
1254 const rgw_obj& dest_obj;
1255
1256 bufferlist out_bl;
1257
1258 string upload_id;
1259
1260 struct CompleteMultipartReq {
1261 map<int, rgw_sync_aws_multipart_part_info> parts;
1262
1263 explicit CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}
1264
1265 void dump_xml(Formatter *f) const {
1266 for (auto p : parts) {
1267 f->open_object_section("Part");
1268 encode_xml("PartNumber", p.first, f);
1269 encode_xml("ETag", p.second.etag, f);
1270 f->close_section();
1271 };
1272 }
1273 } req_enc;
1274
1275 struct CompleteMultipartResult {
1276 string location;
1277 string bucket;
1278 string key;
1279 string etag;
1280
1281 void decode_xml(XMLObj *obj) {
1282 RGWXMLDecoder::decode_xml("Location", bucket, obj);
1283 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
1284 RGWXMLDecoder::decode_xml("Key", key, obj);
1285 RGWXMLDecoder::decode_xml("ETag", etag, obj);
1286 }
1287 } result;
1288
1289 public:
1290 RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc,
1291 RGWRESTConn *_dest_conn,
1292 const rgw_obj& _dest_obj,
1293 string _upload_id,
1294 const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sc->cct),
1295 sc(_sc),
1296 dest_conn(_dest_conn),
1297 dest_obj(_dest_obj),
1298 upload_id(_upload_id),
1299 req_enc(_parts) {}
1300
1301 int operate(const DoutPrefixProvider *dpp) override {
1302 reenter(this) {
1303
1304 yield {
1305 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
1306 stringstream ss;
1307 XMLFormatter formatter;
1308
1309 encode_xml("CompleteMultipartUpload", req_enc, &formatter);
1310
1311 formatter.flush(ss);
1312
1313 bufferlist bl;
1314 bl.append(ss.str());
1315
1316 call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
1317 obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
1318 }
1319
1320 if (retcode < 0) {
1321 ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
1322 return set_cr_error(retcode);
1323 }
1324 {
1325 /*
1326 * If one of the following fails we cannot abort upload, as we cannot
1327 * extract the upload id. If one of these fail it's very likely that that's
1328 * the least of our problem.
1329 */
1330 RGWXMLDecoder::XMLParser parser;
1331 if (!parser.init()) {
1332 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1333 return set_cr_error(-EIO);
1334 }
1335
1336 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1337 string str(out_bl.c_str(), out_bl.length());
1338 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
1339 return set_cr_error(-EIO);
1340 }
1341
1342 try {
1343 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
1344 } catch (RGWXMLDecoder::err& err) {
1345 string str(out_bl.c_str(), out_bl.length());
1346 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1347 return set_cr_error(-EIO);
1348 }
1349 }
1350
1351 ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
1352
1353 return set_cr_done();
1354 }
1355
1356 return 0;
1357 }
1358 };
1359
1360
1361 class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
1362 RGWDataSyncCtx *sc;
1363 RGWRESTConn *dest_conn;
1364 const rgw_obj& dest_obj;
1365 const rgw_raw_obj status_obj;
1366
1367 string upload_id;
1368
1369 public:
1370
1371 RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc,
1372 RGWRESTConn *_dest_conn,
1373 const rgw_obj& _dest_obj,
1374 const rgw_raw_obj& _status_obj,
1375 const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc),
1376 dest_conn(_dest_conn),
1377 dest_obj(_dest_obj),
1378 status_obj(_status_obj),
1379 upload_id(_upload_id) {}
1380
1381 int operate(const DoutPrefixProvider *dpp) override {
1382 reenter(this) {
1383 yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id));
1384 if (retcode < 0) {
1385 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
1386 /* ignore error, best effort */
1387 }
1388 yield call(new RGWRadosRemoveCR(sc->env->driver, status_obj));
1389 if (retcode < 0) {
1390 ldpp_dout(dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
1391 /* ignore error, best effort */
1392 }
1393 return set_cr_done();
1394 }
1395
1396 return 0;
1397 }
1398 };
1399
1400 class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
1401 RGWDataSyncCtx *sc;
1402 RGWDataSyncEnv *sync_env;
1403 AWSSyncConfig& conf;
1404 RGWRESTConn *source_conn;
1405 std::shared_ptr<AWSSyncConfig_Profile> target;
1406 const rgw_obj& src_obj;
1407 const rgw_obj& dest_obj;
1408
1409 uint64_t obj_size;
1410 string src_etag;
1411 rgw_sync_aws_src_obj_properties src_properties;
1412 rgw_rest_obj rest_obj;
1413
1414 rgw_sync_aws_multipart_upload_info status;
1415
1416 map<string, string> new_attrs;
1417
1418 rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
1419
1420 int ret_err{0};
1421
1422 rgw_raw_obj status_obj;
1423
1424 public:
1425 RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc,
1426 rgw_bucket_sync_pipe& _sync_pipe,
1427 AWSSyncConfig& _conf,
1428 RGWRESTConn *_source_conn,
1429 const rgw_obj& _src_obj,
1430 std::shared_ptr<AWSSyncConfig_Profile>& _target,
1431 const rgw_obj& _dest_obj,
1432 uint64_t _obj_size,
1433 const rgw_sync_aws_src_obj_properties& _src_properties,
1434 const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct),
1435 sc(_sc),
1436 sync_env(_sc->env),
1437 conf(_conf),
1438 source_conn(_source_conn),
1439 target(_target),
1440 src_obj(_src_obj),
1441 dest_obj(_dest_obj),
1442 obj_size(_obj_size),
1443 src_properties(_src_properties),
1444 rest_obj(_rest_obj),
1445 status_obj(sync_env->svc->zone->get_zone_params().log_pool,
1446 RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) {
1447 }
1448
1449
1450 int operate(const DoutPrefixProvider *dpp) override {
1451 reenter(this) {
1452 yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(
1453 dpp, sync_env->driver, status_obj, &status, false));
1454
1455 if (retcode < 0 && retcode != -ENOENT) {
1456 ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
1457 return retcode;
1458 }
1459
1460 if (retcode >= 0) {
1461 /* check here that mtime and size did not change */
1462
1463 if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
1464 status.src_properties.etag != src_properties.etag) {
1465 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
1466 retcode = -ENOENT;
1467 }
1468 }
1469
1470 if (retcode == -ENOENT) {
1471 RGWAWSStreamPutCRF::init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
1472
1473 yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id));
1474 if (retcode < 0) {
1475 return set_cr_error(retcode);
1476 }
1477
1478 status.obj_size = obj_size;
1479 status.src_properties = src_properties;
1480 #define MULTIPART_MAX_PARTS 10000
1481 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
1482 status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
1483 status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
1484 status.cur_part = 1;
1485 }
1486
1487 for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
1488 yield {
1489 rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
1490 cur_part_info.part_num = status.cur_part;
1491 cur_part_info.ofs = status.cur_ofs;
1492 cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
1493
1494 pcur_part_info = &cur_part_info;
1495
1496 status.cur_ofs += status.part_size;
1497
1498 call(new RGWAWSStreamObjToCloudMultipartPartCR(sc,
1499 source_conn, src_obj,
1500 target,
1501 dest_obj,
1502 status.src_properties,
1503 status.upload_id,
1504 cur_part_info,
1505 &cur_part_info.etag));
1506 }
1507
1508 if (retcode < 0) {
1509 ldpp_dout(dpp, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
1510 ret_err = retcode;
1511 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
1512 return set_cr_error(ret_err);
1513 }
1514
1515 yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->driver, status_obj, status));
1516 if (retcode < 0) {
1517 ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
1518 /* continue with upload anyway */
1519 }
1520 ldpp_dout(dpp, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
1521 }
1522
1523 yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts));
1524 if (retcode < 0) {
1525 ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
1526 ret_err = retcode;
1527 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
1528 return set_cr_error(ret_err);
1529 }
1530
1531 /* remove status obj */
1532 yield call(new RGWRadosRemoveCR(sync_env->driver, status_obj));
1533 if (retcode < 0) {
1534 ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
1535 /* ignore error, best effort */
1536 }
1537 return set_cr_done();
1538 }
1539
1540 return 0;
1541 }
1542 };
1543 template <class T>
1544 int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
1545 {
1546 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1547 if (iter == attrs.end()) {
1548 *result = def_val;
1549 return 0;
1550 }
1551 bufferlist& bl = iter->second;
1552 if (bl.length() == 0) {
1553 *result = def_val;
1554 return 0;
1555 }
1556 auto bliter = bl.cbegin();
1557 try {
1558 decode(*result, bliter);
1559 } catch (buffer::error& err) {
1560 return -EIO;
1561 }
1562 return 0;
1563 }
1564
1565 // maybe use Fetch Remote Obj instead?
1566 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
1567 rgw_bucket_sync_pipe sync_pipe;
1568 AWSSyncInstanceEnv& instance;
1569
1570 uint64_t versioned_epoch{0};
1571
1572 RGWRESTConn *source_conn{nullptr};
1573 std::shared_ptr<AWSSyncConfig_Profile> target;
1574 bufferlist res;
1575 unordered_map <string, bool> bucket_created;
1576 rgw_rest_obj rest_obj;
1577 int ret{0};
1578
1579 uint32_t src_zone_short_id{0};
1580 uint64_t src_pg_ver{0};
1581
1582 bufferlist out_bl;
1583
1584 struct CreateBucketResult {
1585 string code;
1586
1587 void decode_xml(XMLObj *obj) {
1588 RGWXMLDecoder::decode_xml("Code", code, obj);
1589 }
1590 } result;
1591
1592 rgw_obj src_obj;
1593 rgw_obj dest_obj;
1594
1595 public:
1596 RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1597 rgw_bucket_sync_pipe& _sync_pipe,
1598 rgw_obj_key& _key,
1599 AWSSyncInstanceEnv& _instance,
1600 uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1601 sync_pipe(_sync_pipe),
1602 instance(_instance), versioned_epoch(_versioned_epoch)
1603 {}
1604
1605 ~RGWAWSHandleRemoteObjCBCR(){
1606 }
1607
1608 int operate(const DoutPrefixProvider *dpp) override {
1609 reenter(this) {
1610 ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
1611 if (ret < 0) {
1612 ldpp_dout(dpp, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
1613 } else {
1614 ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
1615 if (ret < 0) {
1616 ldpp_dout(dpp, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
1617 src_pg_ver = 0; /* all or nothing */
1618 }
1619 }
1620 ldpp_dout(dpp, 4) << "AWS: download begin: z=" << sc->source_zone
1621 << " b=" << src_bucket << " k=" << key << " size=" << size
1622 << " mtime=" << mtime << " etag=" << etag
1623 << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
1624 << dendl;
1625
1626 source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
1627 if (!source_conn) {
1628 ldpp_dout(dpp, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl;
1629 return set_cr_error(-EINVAL);
1630 }
1631
1632 instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
1633 instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &dest_obj.bucket.name, &dest_obj.key.name);
1634
1635 if (bucket_created.find(dest_obj.bucket.name) == bucket_created.end()){
1636 yield {
1637 ldpp_dout(dpp, 0) << "AWS: creating bucket " << dest_obj.bucket.name << dendl;
1638 bufferlist bl;
1639 call(new RGWPutRawRESTResourceCR <bufferlist> (sc->cct, target->conn.get(),
1640 sync_env->http_manager,
1641 dest_obj.bucket.name, nullptr, bl, &out_bl));
1642 }
1643 if (retcode < 0 ) {
1644 RGWXMLDecoder::XMLParser parser;
1645 if (!parser.init()) {
1646 ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
1647 return set_cr_error(retcode);
1648 }
1649
1650 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1651 string str(out_bl.c_str(), out_bl.length());
1652 ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
1653 return set_cr_error(retcode);
1654 }
1655
1656 try {
1657 RGWXMLDecoder::decode_xml("Error", result, &parser, true);
1658 } catch (RGWXMLDecoder::err& err) {
1659 string str(out_bl.c_str(), out_bl.length());
1660 ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
1661 return set_cr_error(retcode);
1662 }
1663
1664 if (result.code != "BucketAlreadyOwnedByYou") {
1665 return set_cr_error(retcode);
1666 }
1667 }
1668
1669 bucket_created[dest_obj.bucket.name] = true;
1670 }
1671
1672 yield {
1673 src_obj.bucket = src_bucket;
1674 src_obj.key = key;
1675
1676 /* init output */
1677 rgw_sync_aws_src_obj_properties src_properties;
1678 src_properties.mtime = mtime;
1679 src_properties.etag = etag;
1680 src_properties.zone_short_id = src_zone_short_id;
1681 src_properties.pg_ver = src_pg_ver;
1682 src_properties.versioned_epoch = versioned_epoch;
1683
1684 if (size < instance.conf.s3.multipart_sync_threshold) {
1685 call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, src_obj,
1686 src_properties,
1687 target,
1688 dest_obj));
1689 } else {
1690 rgw_rest_obj rest_obj;
1691 rest_obj.init(key);
1692 if (do_decode_rest_obj(dpp, sc->cct, attrs, headers, &rest_obj)) {
1693 ldpp_dout(dpp, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
1694 return set_cr_error(-EINVAL);
1695 }
1696 call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, src_obj,
1697 target, dest_obj, size, src_properties, rest_obj));
1698 }
1699 }
1700 if (retcode < 0) {
1701 return set_cr_error(retcode);
1702 }
1703
1704 return set_cr_done();
1705 }
1706
1707 return 0;
1708 }
1709 };
1710
1711 class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
1712 rgw_bucket_sync_pipe sync_pipe;
1713 AWSSyncInstanceEnv& instance;
1714 uint64_t versioned_epoch;
1715 public:
1716 RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1717 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1718 AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1719 sync_pipe(_sync_pipe),
1720 instance(_instance), versioned_epoch(_versioned_epoch) {
1721 }
1722
1723 ~RGWAWSHandleRemoteObjCR() {}
1724
1725 RGWStatRemoteObjCBCR *allocate_callback() override {
1726 return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch);
1727 }
1728 };
1729
1730 class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
1731 RGWDataSyncCtx *sc;
1732 std::shared_ptr<AWSSyncConfig_Profile> target;
1733 rgw_bucket_sync_pipe sync_pipe;
1734 rgw_obj_key key;
1735 ceph::real_time mtime;
1736 AWSSyncInstanceEnv& instance;
1737 int ret{0};
1738 public:
1739 RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
1740 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1741 AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc),
1742 sync_pipe(_sync_pipe), key(_key),
1743 mtime(_mtime), instance(_instance) {}
1744 int operate(const DoutPrefixProvider *dpp) override {
1745 reenter(this) {
1746 ldpp_dout(dpp, 0) << ": remove remote obj: z=" << sc->source_zone
1747 << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
1748 yield {
1749 instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
1750 string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
1751 ldpp_dout(dpp, 0) << "AWS: removing aws object at" << path << dendl;
1752
1753 call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(),
1754 sc->env->http_manager,
1755 path, nullptr /* params */));
1756 }
1757 if (retcode < 0) {
1758 return set_cr_error(retcode);
1759 }
1760 return set_cr_done();
1761 }
1762 return 0;
1763 }
1764
1765 };
1766
1767
1768 class RGWAWSDataSyncModule: public RGWDataSyncModule {
1769 CephContext *cct;
1770 AWSSyncInstanceEnv instance;
1771 public:
1772 RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) :
1773 cct(_cct),
1774 instance(_conf) {
1775 }
1776
1777 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1778 instance.init(sc, instance_id);
1779 }
1780
1781 ~RGWAWSDataSyncModule() {}
1782
1783 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
1784 std::optional<uint64_t> versioned_epoch,
1785 const rgw_zone_set_entry& source_trace_entry,
1786 rgw_zone_set *zones_trace) override {
1787 ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1788 return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
1789 }
1790 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
1791 rgw_zone_set *zones_trace) override {
1792 ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1793 return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
1794 }
1795 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
1796 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
1797 rgw_zone_set *zones_trace) override {
1798 ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
1799 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1800 return NULL;
1801 }
1802 };
1803
1804 class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance {
1805 RGWAWSDataSyncModule data_handler;
1806 public:
1807 RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
1808 RGWDataSyncModule *get_data_handler() override {
1809 return &data_handler;
1810 }
1811 };
1812
1813 int RGWAWSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
1814 AWSSyncConfig conf;
1815
1816 int r = conf.init(dpp, cct, config);
1817 if (r < 0) {
1818 return r;
1819 }
1820
1821 instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
1822 return 0;
1823 }