]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_aws.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_sync_module_aws.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4#include "common/errno.h"
5
6#include "rgw_common.h"
7#include "rgw_coroutine.h"
8#include "rgw_sync_module.h"
9#include "rgw_data_sync.h"
10#include "rgw_sync_module_aws.h"
11#include "rgw_cr_rados.h"
12#include "rgw_rest_conn.h"
13#include "rgw_cr_rest.h"
14#include "rgw_acl.h"
15#include "rgw_zone.h"
16
17#include "services/svc_zone.h"
18
19#include <boost/asio/yield.hpp>
20
21#define dout_subsys ceph_subsys_rgw
22
23
24#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
25
26static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}";
27
28static string get_key_oid(const rgw_obj_key& key)
29{
30 string oid = key.name;
31 if (!key.instance.empty() &&
32 !key.have_null_instance()) {
33 oid += string(":") + key.instance;
34 }
35 return oid;
36}
37
38static string obj_to_aws_path(const rgw_obj& obj)
39{
40 string path = obj.bucket.name + "/" + get_key_oid(obj.key);
41
42
43 return path;
44}
45
46/*
47
48 json configuration definition:
49
50 {
51 "connection": {
52 "access_key": <access>,
53 "secret": <secret>,
54 "endpoint": <endpoint>,
55 "host_style": <path | virtual>,
56 },
57 "acls": [ { "type": <id | email | uri>,
58 "source_id": <source_id>,
59 "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
60 "target_path": <target_path>, # override default
61
62
63 # anything below here is for non trivial configuration
64 # can be used in conjuction with the above
65
66 "default": {
67 "connection": {
68 "access_key": <access>,
69 "secret": <secret>,
70 "endpoint": <endpoint>,
71 "host_style" <path | virtual>,
72 },
73 "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
74 {
75 "type" : <id | email | uri>, # optional, default is id
76 "source_id": <id>,
77 "dest_id": <id>
78 } ... ]
79 "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
80 # final object name will be target_path + "/" + obj
81 },
82 "connections": [
83 {
84 "id": <id>,
85 "access_key": <access>,
86 "secret": <secret>,
87 "endpoint": <endpoint>,
88 } ... ],
89 "acl_profiles": [
90 {
91 "id": <id>, # acl mappings
92 "acls": [ {
93 "type": <id | email | uri>,
94 "source_id": <id>,
95 "dest_id": <id>
96 } ... ]
97 }
98 ],
99 "profiles": [
100 {
101 "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
102 "target_path": <dest>, # (override default)
103 "connection_id": <connection_id>, # optional, if empty references default connection
104 "acls_id": <mappings_id>, # optional, if empty references default mappings
105 } ... ],
106 }
107
108target path optional variables:
109
110(evaluated at init)
111sid: sync instance id, randomly generated by sync process on first sync initalization
112zonegroup: zonegroup name
113zonegroup_id: zonegroup name
114zone: zone name
115zone_id: zone name
116
117(evaluated when syncing)
118bucket: bucket name
119owner: bucket owner
120
121*/
122
123struct ACLMapping {
124 ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
125 string source_id;
126 string dest_id;
127
128 ACLMapping() = default;
129
130 ACLMapping(ACLGranteeTypeEnum t,
131 const string& s,
132 const string& d) : type(t),
133 source_id(s),
134 dest_id(d) {}
135
136 void init(const JSONFormattable& config) {
137 const string& t = config["type"];
138
139 if (t == "email") {
140 type = ACL_TYPE_EMAIL_USER;
141 } else if (t == "uri") {
142 type = ACL_TYPE_GROUP;
143 } else {
144 type = ACL_TYPE_CANON_USER;
145 }
146
147 source_id = config["source_id"];
148 dest_id = config["dest_id"];
149 }
150
151 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
152 Formatter::ObjectSection os(jf, "acl_mapping");
153 string s;
154 switch (type) {
155 case ACL_TYPE_EMAIL_USER:
156 s = "email";
157 break;
158 case ACL_TYPE_GROUP:
159 s = "uri";
160 break;
161 default:
162 s = "id";
163 break;
164 }
165 encode_json("type", s, &jf);
166 encode_json("source_id", source_id, &jf);
167 encode_json("dest_id", dest_id, &jf);
168 }
169};
170
171struct ACLMappings {
172 map<string, ACLMapping> acl_mappings;
173
174 void init(const JSONFormattable& config) {
175 for (auto& c : config.array()) {
176 ACLMapping m;
177 m.init(c);
178
179 acl_mappings.emplace(std::make_pair(m.source_id, m));
180 }
181 }
182 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
183 Formatter::ArraySection os(jf, "acls");
184
185 for (auto& i : acl_mappings) {
186 i.second.dump_conf(cct, jf);
187 }
188 }
189};
190
191struct AWSSyncConfig_ACLProfiles {
192 map<string, std::shared_ptr<ACLMappings> > acl_profiles;
193
194 void init(const JSONFormattable& config) {
195 for (auto& c : config.array()) {
196 const string& profile_id = c["id"];
197
198 std::shared_ptr<ACLMappings> ap{new ACLMappings};
199 ap->init(c["acls"]);
200
201 acl_profiles[profile_id] = ap;
202 }
203 }
204 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
205 Formatter::ArraySection section(jf, "acl_profiles");
206
207 for (auto& p : acl_profiles) {
208 Formatter::ObjectSection section(jf, "profile");
209 encode_json("id", p.first, &jf);
210 p.second->dump_conf(cct, jf);
211 }
212 }
213
214 bool find(const string& profile_id, ACLMappings *result) const {
215 auto iter = acl_profiles.find(profile_id);
216 if (iter == acl_profiles.end()) {
217 return false;
218 }
219 *result = *iter->second;
220 return true;
221 }
222};
223
224struct AWSSyncConfig_Connection {
225 string connection_id;
226 string endpoint;
227 RGWAccessKey key;
228 HostStyle host_style{PathStyle};
229
230 bool has_endpoint{false};
231 bool has_key{false};
232 bool has_host_style{false};
233
234 void init(const JSONFormattable& config) {
235 has_endpoint = config.exists("endpoint");
236 has_key = config.exists("access_key") || config.exists("secret");
237 has_host_style = config.exists("host_style");
238
239 connection_id = config["id"];
240 endpoint = config["endpoint"];
241
242 key = RGWAccessKey(config["access_key"], config["secret"]);
243 string host_style_str = config["host_style"];
244 if (host_style_str != "virtual") {
245 host_style = PathStyle;
246 } else {
247 host_style = VirtualStyle;
248 }
249 }
250 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
251 Formatter::ObjectSection section(jf, "connection");
252 encode_json("id", connection_id, &jf);
253 encode_json("endpoint", endpoint, &jf);
254 string s = (host_style == PathStyle ? "path" : "virtual");
255 encode_json("host_style", s, &jf);
256
257 {
258 Formatter::ObjectSection os(jf, "key");
259 encode_json("access_key", key.id, &jf);
260 string secret = (key.key.empty() ? "" : "******");
261 encode_json("secret", secret, &jf);
262 }
263 }
264};
265
266static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
267{
268 string sval;
269 if (config.find(key, &sval)) {
270 string err;
271 uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
272 if (!err.empty()) {
273 ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
274 return -EINVAL;
275 }
276 *pval = val;
277 }
278 return 0;
279}
280
281struct AWSSyncConfig_S3 {
282 uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
283 uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
284
285 int init(CephContext *cct, const JSONFormattable& config) {
286 int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
287 if (r < 0) {
288 return r;
289 }
290
291 r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size);
292 if (r < 0) {
293 return r;
294 }
295#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
296 if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
297 multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
298 }
299 return 0;
300 }
301
302 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
303 Formatter::ObjectSection section(jf, "s3");
304 encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf);
305 encode_json("multipart_min_part_size", multipart_min_part_size, &jf);
306 }
307};
308
309struct AWSSyncConfig_Profile {
310 string source_bucket;
311 bool prefix{false};
312 string target_path;
313 string connection_id;
314 string acls_id;
315
316 std::shared_ptr<AWSSyncConfig_Connection> conn_conf;
317 std::shared_ptr<ACLMappings> acls;
318
319 std::shared_ptr<RGWRESTConn> conn;
320
321 void init(const JSONFormattable& config) {
322 source_bucket = config["source_bucket"];
323
324 prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*');
325
326 if (prefix) {
327 source_bucket = source_bucket.substr(0, source_bucket.size() - 1);
328 }
329
330 target_path = config["target_path"];
331 connection_id = config["connection_id"];
332 acls_id = config["acls_id"];
333
334 if (config.exists("connection")) {
335 conn_conf = make_shared<AWSSyncConfig_Connection>();
336 conn_conf->init(config["connection"]);
337 }
338
339 if (config.exists("acls")) {
340 acls = make_shared<ACLMappings>();
341 acls->init(config["acls"]);
342 }
343 }
344
345 void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const {
346 Formatter::ObjectSection config(jf, section);
347 string sb{source_bucket};
348 if (prefix) {
349 sb.append("*");
350 }
351 encode_json("source_bucket", sb, &jf);
352 encode_json("target_path", target_path, &jf);
353 encode_json("connection_id", connection_id, &jf);
354 encode_json("acls_id", acls_id, &jf);
355 if (conn_conf.get()) {
356 conn_conf->dump_conf(cct, jf);
357 }
358 if (acls.get()) {
359 acls->dump_conf(cct, jf);
360 }
361 }
362};
363
364static void find_and_replace(const string& src, const string& find, const string& replace, string *dest)
365{
366 string s = src;
367
368 size_t pos = s.find(find);
369 while (pos != string::npos) {
370 size_t next_ofs = pos + find.size();
371 s = s.substr(0, pos) + replace + s.substr(next_ofs);
372 pos = s.find(find, next_ofs);
373 }
374
375 *dest = s;
376}
377
378static void apply_meta_param(const string& src, const string& param, const string& val, string *dest)
379{
380 string s = string("${") + param + "}";
381 find_and_replace(src, s, val, dest);
382}
383
384
385struct AWSSyncConfig {
386 AWSSyncConfig_Profile default_profile;
387 std::shared_ptr<AWSSyncConfig_Profile> root_profile;
388
389 map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections;
390 AWSSyncConfig_ACLProfiles acl_profiles;
391
392 map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles;
393
394 AWSSyncConfig_S3 s3;
395
396 int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
397 bool connection_must_exist) {
398 if (!profile.connection_id.empty()) {
399 if (profile.conn_conf) {
400 ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
401 return -EINVAL;
402 }
403 if (connections.find(profile.connection_id) == connections.end()) {
404 ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
405 return -EINVAL;
406 }
407 profile.conn_conf = connections[profile.connection_id];
408 } else if (!profile.conn_conf) {
409 profile.connection_id = default_profile.connection_id;
410 auto i = connections.find(profile.connection_id);
411 if (i != connections.end()) {
412 profile.conn_conf = i->second;
413 }
414 }
415
416 if (connection_must_exist && !profile.conn_conf) {
417 ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
418 return -EINVAL;
419 }
420
421 if (profile.conn_conf && default_profile.conn_conf) {
422 if (!profile.conn_conf->has_endpoint) {
423 profile.conn_conf->endpoint = default_profile.conn_conf->endpoint;
424 }
425 if (!profile.conn_conf->has_host_style) {
426 profile.conn_conf->host_style = default_profile.conn_conf->host_style;
427 }
428 if (!profile.conn_conf->has_key) {
429 profile.conn_conf->key = default_profile.conn_conf->key;
430 }
431 }
432
433 ACLMappings acl_mappings;
434
435 if (!profile.acls_id.empty()) {
436 if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
437 ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
438 return -EINVAL;
439 }
440 profile.acls = acl_profiles.acl_profiles[profile.acls_id];
441 } else if (!profile.acls) {
442 if (default_profile.acls) {
443 profile.acls = default_profile.acls;
444 profile.acls_id = default_profile.acls_id;
445 }
446 }
447
448 if (profile.target_path.empty()) {
449 profile.target_path = default_profile.target_path;
450 }
451 if (profile.target_path.empty()) {
452 profile.target_path = default_target_path;
453 }
454
455 return 0;
456 }
457
458 int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
459 std::shared_ptr<AWSSyncConfig_Profile> profile;
460 profile.reset(new AWSSyncConfig_Profile);
461 profile->init(profile_conf);
462
463 int ret = init_profile(cct, profile_conf, *profile, true);
464 if (ret < 0) {
465 return ret;
466 }
467
468 auto& sb = profile->source_bucket;
469
470 if (explicit_profiles.find(sb) != explicit_profiles.end()) {
471 ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
472 }
473
474 explicit_profiles[sb] = profile;
475 if (ptarget) {
476 *ptarget = profile;
477 }
478 return 0;
479 }
480
481 bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
482 const string& name = bucket.name;
483 auto iter = explicit_profiles.upper_bound(name);
484 if (iter == explicit_profiles.begin()) {
485 return false;
486 }
487
488 --iter;
489 if (iter->first.size() > name.size()) {
490 return false;
491 }
492 if (name.compare(0, iter->first.size(), iter->first) != 0) {
493 return false;
494 }
495
496 std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second;
497
498 if (!target->prefix &&
499 name.size() != iter->first.size()) {
500 return false;
501 }
502
503 *result = target;
504 return true;
505 }
506
507 void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
508 if (!do_find_profile(bucket, result)) {
509 *result = root_profile;
510 }
511 }
512
513 AWSSyncConfig() {}
514
515 int init(CephContext *cct, const JSONFormattable& config) {
516 auto& default_conf = config["default"];
517
518 if (config.exists("default")) {
519 default_profile.init(default_conf);
520 init_profile(cct, default_conf, default_profile, false);
521 }
522
523 for (auto& conn : config["connections"].array()) {
524 auto new_conn = conn;
525
526 std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection};
527 c->init(new_conn);
528
529 connections[new_conn["id"]] = c;
530 }
531
532 acl_profiles.init(config["acl_profiles"]);
533
534 int r = s3.init(cct, config["s3"]);
535 if (r < 0) {
536 return r;
537 }
538
539 auto new_root_conf = config;
540
541 r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */
542 if (r < 0) {
543 return r;
544 }
545
546 for (auto target_conf : config["profiles"].array()) {
547 int r = init_target(cct, target_conf, nullptr);
548 if (r < 0) {
549 return r;
550 }
551 }
552
553 JSONFormatter jf(true);
554 dump_conf(cct, jf);
555 stringstream ss;
556 jf.flush(ss);
557
558 ldout(cct, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;
559
560 return 0;
561 }
562
9f95a23c 563 void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) {
11fdf7f2
TL
564 apply_meta_param(path, "sid", sid, dest);
565
9f95a23c 566 const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup();
11fdf7f2
TL
567 apply_meta_param(path, "zonegroup", zg.get_name(), dest);
568 apply_meta_param(path, "zonegroup_id", zg.get_id(), dest);
569
9f95a23c 570 const RGWZone& zone = sc->env->svc->zone->get_zone();
11fdf7f2
TL
571 apply_meta_param(path, "zone", zone.name, dest);
572 apply_meta_param(path, "zone_id", zone.id, dest);
573 }
574
9f95a23c
TL
575 void update_config(RGWDataSyncCtx *sc, const string& sid) {
576 expand_target(sc, sid, root_profile->target_path, &root_profile->target_path);
577 ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
11fdf7f2 578 for (auto& t : explicit_profiles) {
9f95a23c
TL
579 expand_target(sc, sid, t.second->target_path, &t.second->target_path);
580 ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
11fdf7f2
TL
581 }
582 }
583
584 void dump_conf(CephContext *cct, JSONFormatter& jf) const {
585 Formatter::ObjectSection config(jf, "config");
586 root_profile->dump_conf(cct, jf);
587 jf.open_array_section("connections");
588 for (auto c : connections) {
589 c.second->dump_conf(cct, jf);
590 }
591 jf.close_section();
592
593 acl_profiles.dump_conf(cct, jf);
594
595 { // targets
596 Formatter::ArraySection as(jf, "profiles");
597 for (auto& t : explicit_profiles) {
598 Formatter::ObjectSection target_section(jf, "profile");
599 encode_json("name", t.first, &jf);
600 t.second->dump_conf(cct, jf);
601 }
602 }
603 }
604
605 string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile,
606 const RGWBucketInfo& bucket_info,
607 const rgw_obj_key& obj) {
608 string bucket_str;
609 string owner;
610 if (!bucket_info.owner.tenant.empty()) {
611 bucket_str = owner = bucket_info.owner.tenant + "-";
612 owner += bucket_info.owner.id;
613 }
614 bucket_str += bucket_info.bucket.name;
615
616 const string& path = profile->target_path;
617
618 string new_path;
619 apply_meta_param(path, "bucket", bucket_str, &new_path);
620 apply_meta_param(new_path, "owner", owner, &new_path);
621
622 new_path += string("/") + get_key_oid(obj);
623
624 return new_path;
625 }
626
627 void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile,
628 const RGWBucketInfo& bucket_info,
629 const rgw_obj_key& obj,
630 string *bucket_name,
631 string *obj_name) {
632 string path = get_path(profile, bucket_info, obj);
633 size_t pos = path.find('/');
634
635 *bucket_name = path.substr(0, pos);
636 *obj_name = path.substr(pos + 1);
637 }
638
9f95a23c
TL
639 void init_conns(RGWDataSyncCtx *sc, const string& id) {
640 auto sync_env = sc->env;
641
642 update_config(sc, id);
11fdf7f2
TL
643
644 auto& root_conf = root_profile->conn_conf;
645
9f95a23c
TL
646 root_profile->conn.reset(new S3RESTConn(sc->cct,
647 sync_env->svc->zone,
11fdf7f2
TL
648 id,
649 { root_conf->endpoint },
650 root_conf->key,
651 root_conf->host_style));
652
653 for (auto i : explicit_profiles) {
654 auto& c = i.second;
655
9f95a23c
TL
656 c->conn.reset(new S3RESTConn(sc->cct,
657 sync_env->svc->zone,
11fdf7f2
TL
658 id,
659 { c->conn_conf->endpoint },
660 c->conn_conf->key,
661 c->conn_conf->host_style));
662 }
663 }
664};
665
666
667struct AWSSyncInstanceEnv {
668 AWSSyncConfig conf;
669 string id;
670
671 explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {}
672
9f95a23c 673 void init(RGWDataSyncCtx *sc, uint64_t instance_id) {
11fdf7f2
TL
674 char buf[32];
675 snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
676 id = buf;
677
9f95a23c 678 conf.init_conns(sc, id);
11fdf7f2
TL
679 }
680
681 void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
682 conf.find_profile(bucket, ptarget);
683 ceph_assert(ptarget);
684 }
685};
686
687static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
688{
689 for (auto header : headers) {
690 const string& val = header.second;
691 if (header.first == "RGWX_OBJECT_SIZE") {
692 info->content_len = atoi(val.c_str());
693 } else {
694 info->attrs[header.first] = val;
695 }
696 }
697
698 info->acls.set_ctx(cct);
699 auto aiter = attrs.find(RGW_ATTR_ACL);
700 if (aiter != attrs.end()) {
701 bufferlist& bl = aiter->second;
702 auto bliter = bl.cbegin();
703 try {
704 info->acls.decode(bliter);
705 } catch (buffer::error& err) {
706 ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
707 return -EIO;
708 }
709 } else {
710 ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
711 }
712
713 return 0;
714}
715
716class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
717{
9f95a23c 718 RGWDataSyncCtx *sc;
11fdf7f2
TL
719 RGWRESTConn *conn;
720 rgw_obj src_obj;
721 RGWRESTConn::get_obj_params req_params;
722
723 rgw_sync_aws_src_obj_properties src_properties;
724public:
725 RGWRESTStreamGetCRF(CephContext *_cct,
726 RGWCoroutinesEnv *_env,
727 RGWCoroutine *_caller,
9f95a23c 728 RGWDataSyncCtx *_sc,
11fdf7f2
TL
729 RGWRESTConn *_conn,
730 rgw_obj& _src_obj,
731 const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller,
9f95a23c
TL
732 _sc->env->http_manager, _src_obj.key),
733 sc(_sc), conn(_conn), src_obj(_src_obj),
11fdf7f2
TL
734 src_properties(_src_properties) {
735 }
736
737 int init() override {
738 /* init input connection */
739
740
741 req_params.get_op = true;
742 req_params.prepend_metadata = true;
743
744 req_params.unmod_ptr = &src_properties.mtime;
745 req_params.etag = src_properties.etag;
746 req_params.mod_zone_id = src_properties.zone_short_id;
747 req_params.mod_pg_ver = src_properties.pg_ver;
748
749 if (range.is_set) {
750 req_params.range_is_set = true;
751 req_params.range_start = range.ofs;
752 req_params.range_end = range.ofs + range.size - 1;
753 }
754
755 RGWRESTStreamRWRequest *in_req;
756 int ret = conn->get_obj(src_obj, req_params, false /* send */, &in_req);
757 if (ret < 0) {
9f95a23c 758 ldout(sc->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
11fdf7f2
TL
759 return ret;
760 }
761
762 set_req(in_req);
763
764 return RGWStreamReadHTTPResourceCRF::init();
765 }
766
767 int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
768 map<string, bufferlist> src_attrs;
769
9f95a23c 770 ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
11fdf7f2
TL
771
772 if (extra_data.length() > 0) {
773 JSONParser jp;
774 if (!jp.parse(extra_data.c_str(), extra_data.length())) {
9f95a23c 775 ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
11fdf7f2
TL
776 return -EIO;
777 }
778
779 JSONDecoder::decode_json("attrs", src_attrs, &jp);
780 }
9f95a23c 781 return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj);
11fdf7f2
TL
782 }
783
784 bool need_extra_data() override {
785 return true;
786 }
787};
788
789static std::set<string> keep_headers = { "CONTENT_TYPE",
790 "CONTENT_ENCODING",
791 "CONTENT_DISPOSITION",
792 "CONTENT_LANGUAGE" };
793
794class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
795{
9f95a23c 796 RGWDataSyncCtx *sc;
11fdf7f2
TL
797 rgw_sync_aws_src_obj_properties src_properties;
798 std::shared_ptr<AWSSyncConfig_Profile> target;
799 rgw_obj dest_obj;
800 string etag;
801public:
802 RGWAWSStreamPutCRF(CephContext *_cct,
803 RGWCoroutinesEnv *_env,
804 RGWCoroutine *_caller,
9f95a23c 805 RGWDataSyncCtx *_sc,
11fdf7f2
TL
806 const rgw_sync_aws_src_obj_properties& _src_properties,
807 std::shared_ptr<AWSSyncConfig_Profile>& _target,
9f95a23c
TL
808 rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager),
809 sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) {
11fdf7f2
TL
810 }
811
812 int init() override {
813 /* init output connection */
814 RGWRESTStreamS3PutObj *out_req{nullptr};
815
816 if (multipart.is_multipart) {
817 char buf[32];
818 snprintf(buf, sizeof(buf), "%d", multipart.part_num);
819 rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
820 { "partNumber", buf },
821 { nullptr, nullptr } };
822 target->conn->put_obj_send_init(dest_obj, params, &out_req);
823 } else {
824 target->conn->put_obj_send_init(dest_obj, nullptr, &out_req);
825 }
826
827 set_req(out_req);
828
829 return RGWStreamWriteHTTPResourceCRF::init();
830 }
831
832 static bool keep_attr(const string& h) {
833 return (keep_headers.find(h) != keep_headers.end() ||
834 boost::algorithm::starts_with(h, "X_AMZ_"));
835 }
836
837 static void init_send_attrs(CephContext *cct,
838 const rgw_rest_obj& rest_obj,
839 const rgw_sync_aws_src_obj_properties& src_properties,
840 const AWSSyncConfig_Profile *target,
841 map<string, string> *attrs) {
842 auto& new_attrs = *attrs;
843
844 new_attrs.clear();
845
846 for (auto& hi : rest_obj.attrs) {
847 if (keep_attr(hi.first)) {
848 new_attrs.insert(hi);
849 }
850 }
851
852 auto acl = rest_obj.acls.get_acl();
853
854 map<int, vector<string> > access_map;
855
856 if (target->acls) {
857 for (auto& grant : acl.get_grant_map()) {
858 auto& orig_grantee = grant.first;
859 auto& perm = grant.second;
860
861 string grantee;
862
863 const auto& am = target->acls->acl_mappings;
864
865 auto iter = am.find(orig_grantee);
866 if (iter == am.end()) {
867 ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
868 continue;
869 }
870
871 grantee = iter->second.dest_id;
872
873 string type;
874
875 switch (iter->second.type) {
876 case ACL_TYPE_CANON_USER:
877 type = "id";
878 break;
879 case ACL_TYPE_EMAIL_USER:
880 type = "emailAddress";
881 break;
882 case ACL_TYPE_GROUP:
883 type = "uri";
884 break;
885 default:
886 continue;
887 }
888
889 string tv = type + "=" + grantee;
890
891 int flags = perm.get_permission().get_permissions();
892 if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
893 access_map[flags].push_back(tv);
894 continue;
895 }
896
897 for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
898 if (flags & i) {
899 access_map[i].push_back(tv);
900 }
901 }
902 }
903 }
904
905 for (auto aiter : access_map) {
906 int grant_type = aiter.first;
907
908 string header_str("x-amz-grant-");
909
910 switch (grant_type) {
911 case RGW_PERM_READ:
912 header_str.append("read");
913 break;
914 case RGW_PERM_WRITE:
915 header_str.append("write");
916 break;
917 case RGW_PERM_READ_ACP:
918 header_str.append("read-acp");
919 break;
920 case RGW_PERM_WRITE_ACP:
921 header_str.append("write-acp");
922 break;
923 case RGW_PERM_FULL_CONTROL:
924 header_str.append("full-control");
925 break;
926 }
927
928 string s;
929
930 for (auto viter : aiter.second) {
931 if (!s.empty()) {
932 s.append(", ");
933 }
934 s.append(viter);
935 }
936
937 ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
938
939 new_attrs[header_str] = s;
940 }
941
942 char buf[32];
943 snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch);
944 new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
945
946 utime_t ut(src_properties.mtime);
947 snprintf(buf, sizeof(buf), "%lld.%09lld",
948 (long long)ut.sec(),
949 (long long)ut.nsec());
950
951 new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
952 new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag;
953 new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
954 if (!rest_obj.key.instance.empty()) {
955 new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
956 }
957 }
958
959 void send_ready(const rgw_rest_obj& rest_obj) override {
960 RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
961
962 map<string, string> new_attrs;
963 if (!multipart.is_multipart) {
9f95a23c 964 init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
11fdf7f2
TL
965 }
966
967 r->set_send_length(rest_obj.content_len);
968
969 RGWAccessControlPolicy policy;
970
971 r->send_ready(target->conn->get_key(), new_attrs, policy, false);
972 }
973
974 void handle_headers(const map<string, string>& headers) {
975 for (auto h : headers) {
976 if (h.first == "ETAG") {
977 etag = h.second;
978 }
979 }
980 }
981
982 bool get_etag(string *petag) {
983 if (etag.empty()) {
984 return false;
985 }
986 *petag = etag;
987 return true;
988 }
989};
990
991
992class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
9f95a23c 993 RGWDataSyncCtx *sc;
11fdf7f2
TL
994 RGWRESTConn *source_conn;
995 std::shared_ptr<AWSSyncConfig_Profile> target;
996 rgw_obj src_obj;
997 rgw_obj dest_obj;
998
999 rgw_sync_aws_src_obj_properties src_properties;
1000
1001 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
1002 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
1003
1004public:
9f95a23c 1005 RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1006 RGWRESTConn *_source_conn,
1007 const rgw_obj& _src_obj,
1008 const rgw_sync_aws_src_obj_properties& _src_properties,
1009 std::shared_ptr<AWSSyncConfig_Profile> _target,
9f95a23c
TL
1010 const rgw_obj& _dest_obj) : RGWCoroutine(_sc->cct),
1011 sc(_sc),
11fdf7f2
TL
1012 source_conn(_source_conn),
1013 target(_target),
1014 src_obj(_src_obj),
1015 dest_obj(_dest_obj),
1016 src_properties(_src_properties) {}
1017
1018 int operate() override {
1019 reenter(this) {
1020 /* init input */
9f95a23c 1021 in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
11fdf7f2
TL
1022 source_conn, src_obj,
1023 src_properties));
1024
1025 /* init output */
9f95a23c 1026 out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
11fdf7f2
TL
1027 src_properties, target, dest_obj));
1028
9f95a23c 1029 yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
11fdf7f2
TL
1030 if (retcode < 0) {
1031 return set_cr_error(retcode);
1032 }
1033
1034 return set_cr_done();
1035 }
1036
1037 return 0;
1038 }
1039};
1040
1041class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
9f95a23c 1042 RGWDataSyncCtx *sc;
11fdf7f2
TL
1043 RGWRESTConn *source_conn;
1044 std::shared_ptr<AWSSyncConfig_Profile> target;
1045 rgw_obj src_obj;
1046 rgw_obj dest_obj;
1047
1048 rgw_sync_aws_src_obj_properties src_properties;
1049
1050 string upload_id;
1051
1052 rgw_sync_aws_multipart_part_info part_info;
1053
1054 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
1055 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
1056
1057 string *petag;
1058
1059public:
9f95a23c 1060 RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1061 RGWRESTConn *_source_conn,
1062 const rgw_obj& _src_obj,
1063 std::shared_ptr<AWSSyncConfig_Profile>& _target,
1064 const rgw_obj& _dest_obj,
1065 const rgw_sync_aws_src_obj_properties& _src_properties,
1066 const string& _upload_id,
1067 const rgw_sync_aws_multipart_part_info& _part_info,
9f95a23c
TL
1068 string *_petag) : RGWCoroutine(_sc->cct),
1069 sc(_sc),
11fdf7f2
TL
1070 source_conn(_source_conn),
1071 target(_target),
1072 src_obj(_src_obj),
1073 dest_obj(_dest_obj),
1074 src_properties(_src_properties),
1075 upload_id(_upload_id),
1076 part_info(_part_info),
1077 petag(_petag) {}
1078
1079 int operate() override {
1080 reenter(this) {
1081 /* init input */
9f95a23c 1082 in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
11fdf7f2
TL
1083 source_conn, src_obj,
1084 src_properties));
1085
1086 in_crf->set_range(part_info.ofs, part_info.size);
1087
1088 /* init output */
9f95a23c 1089 out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
11fdf7f2
TL
1090 src_properties, target, dest_obj));
1091
1092 out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
1093
9f95a23c 1094 yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
11fdf7f2
TL
1095 if (retcode < 0) {
1096 return set_cr_error(retcode);
1097 }
1098
1099 if (!(static_cast<RGWAWSStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
9f95a23c 1100 ldout(sc->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
11fdf7f2
TL
1101 return set_cr_error(-EIO);
1102 }
1103
1104 return set_cr_done();
1105 }
1106
1107 return 0;
1108 }
1109};
1110
1111class RGWAWSAbortMultipartCR : public RGWCoroutine {
9f95a23c 1112 RGWDataSyncCtx *sc;
11fdf7f2
TL
1113 RGWRESTConn *dest_conn;
1114 rgw_obj dest_obj;
1115
1116 string upload_id;
1117
1118public:
9f95a23c 1119 RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1120 RGWRESTConn *_dest_conn,
1121 const rgw_obj& _dest_obj,
9f95a23c
TL
1122 const string& _upload_id) : RGWCoroutine(_sc->cct),
1123 sc(_sc),
11fdf7f2
TL
1124 dest_conn(_dest_conn),
1125 dest_obj(_dest_obj),
1126 upload_id(_upload_id) {}
1127
1128 int operate() override {
1129 reenter(this) {
1130
1131 yield {
1132 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
1133 bufferlist bl;
9f95a23c 1134 call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager,
11fdf7f2
TL
1135 obj_to_aws_path(dest_obj), params));
1136 }
1137
1138 if (retcode < 0) {
9f95a23c 1139 ldout(sc->cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
11fdf7f2
TL
1140 return set_cr_error(retcode);
1141 }
1142
1143 return set_cr_done();
1144 }
1145
1146 return 0;
1147 }
1148};
1149
1150class RGWAWSInitMultipartCR : public RGWCoroutine {
9f95a23c 1151 RGWDataSyncCtx *sc;
11fdf7f2
TL
1152 RGWRESTConn *dest_conn;
1153 rgw_obj dest_obj;
1154
1155 uint64_t obj_size;
1156 map<string, string> attrs;
1157
1158 bufferlist out_bl;
1159
1160 string *upload_id;
1161
1162 struct InitMultipartResult {
1163 string bucket;
1164 string key;
1165 string upload_id;
1166
1167 void decode_xml(XMLObj *obj) {
1168 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
1169 RGWXMLDecoder::decode_xml("Key", key, obj);
1170 RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
1171 }
1172 } result;
1173
1174public:
9f95a23c 1175 RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1176 RGWRESTConn *_dest_conn,
1177 const rgw_obj& _dest_obj,
1178 uint64_t _obj_size,
1179 const map<string, string>& _attrs,
9f95a23c
TL
1180 string *_upload_id) : RGWCoroutine(_sc->cct),
1181 sc(_sc),
11fdf7f2
TL
1182 dest_conn(_dest_conn),
1183 dest_obj(_dest_obj),
1184 obj_size(_obj_size),
1185 attrs(_attrs),
1186 upload_id(_upload_id) {}
1187
1188 int operate() override {
1189 reenter(this) {
1190
1191 yield {
1192 rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
1193 bufferlist bl;
9f95a23c 1194 call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
11fdf7f2
TL
1195 obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
1196 }
1197
1198 if (retcode < 0) {
9f95a23c 1199 ldout(sc->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
11fdf7f2
TL
1200 return set_cr_error(retcode);
1201 }
1202 {
1203 /*
1204 * If one of the following fails we cannot abort upload, as we cannot
1205 * extract the upload id. If one of these fail it's very likely that that's
1206 * the least of our problem.
1207 */
1208 RGWXMLDecoder::XMLParser parser;
1209 if (!parser.init()) {
9f95a23c 1210 ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
11fdf7f2
TL
1211 return set_cr_error(-EIO);
1212 }
1213
1214 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1215 string str(out_bl.c_str(), out_bl.length());
9f95a23c 1216 ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
11fdf7f2
TL
1217 return set_cr_error(-EIO);
1218 }
1219
1220 try {
1221 RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
1222 } catch (RGWXMLDecoder::err& err) {
1223 string str(out_bl.c_str(), out_bl.length());
9f95a23c 1224 ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
11fdf7f2
TL
1225 return set_cr_error(-EIO);
1226 }
1227 }
1228
9f95a23c 1229 ldout(sc->cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
11fdf7f2
TL
1230
1231 *upload_id = result.upload_id;
1232
1233 return set_cr_done();
1234 }
1235
1236 return 0;
1237 }
1238};
1239
1240class RGWAWSCompleteMultipartCR : public RGWCoroutine {
9f95a23c 1241 RGWDataSyncCtx *sc;
11fdf7f2
TL
1242 RGWRESTConn *dest_conn;
1243 rgw_obj dest_obj;
1244
1245 bufferlist out_bl;
1246
1247 string upload_id;
1248
1249 struct CompleteMultipartReq {
1250 map<int, rgw_sync_aws_multipart_part_info> parts;
1251
1252 explicit CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}
1253
1254 void dump_xml(Formatter *f) const {
1255 for (auto p : parts) {
1256 f->open_object_section("Part");
1257 encode_xml("PartNumber", p.first, f);
1258 encode_xml("ETag", p.second.etag, f);
1259 f->close_section();
1260 };
1261 }
1262 } req_enc;
1263
1264 struct CompleteMultipartResult {
1265 string location;
1266 string bucket;
1267 string key;
1268 string etag;
1269
1270 void decode_xml(XMLObj *obj) {
1271 RGWXMLDecoder::decode_xml("Location", bucket, obj);
1272 RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
1273 RGWXMLDecoder::decode_xml("Key", key, obj);
1274 RGWXMLDecoder::decode_xml("ETag", etag, obj);
1275 }
1276 } result;
1277
1278public:
9f95a23c 1279 RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1280 RGWRESTConn *_dest_conn,
1281 const rgw_obj& _dest_obj,
1282 string _upload_id,
9f95a23c
TL
1283 const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sc->cct),
1284 sc(_sc),
11fdf7f2
TL
1285 dest_conn(_dest_conn),
1286 dest_obj(_dest_obj),
1287 upload_id(_upload_id),
1288 req_enc(_parts) {}
1289
1290 int operate() override {
1291 reenter(this) {
1292
1293 yield {
1294 rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
1295 stringstream ss;
1296 XMLFormatter formatter;
1297
1298 encode_xml("CompleteMultipartUpload", req_enc, &formatter);
1299
1300 formatter.flush(ss);
1301
1302 bufferlist bl;
1303 bl.append(ss.str());
1304
9f95a23c 1305 call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
11fdf7f2
TL
1306 obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
1307 }
1308
1309 if (retcode < 0) {
9f95a23c 1310 ldout(sc->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
11fdf7f2
TL
1311 return set_cr_error(retcode);
1312 }
1313 {
1314 /*
1315 * If one of the following fails we cannot abort upload, as we cannot
1316 * extract the upload id. If one of these fail it's very likely that that's
1317 * the least of our problem.
1318 */
1319 RGWXMLDecoder::XMLParser parser;
1320 if (!parser.init()) {
9f95a23c 1321 ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
11fdf7f2
TL
1322 return set_cr_error(-EIO);
1323 }
1324
1325 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1326 string str(out_bl.c_str(), out_bl.length());
9f95a23c 1327 ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
11fdf7f2
TL
1328 return set_cr_error(-EIO);
1329 }
1330
1331 try {
1332 RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
1333 } catch (RGWXMLDecoder::err& err) {
1334 string str(out_bl.c_str(), out_bl.length());
9f95a23c 1335 ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
11fdf7f2
TL
1336 return set_cr_error(-EIO);
1337 }
1338 }
1339
9f95a23c 1340 ldout(sc->cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
11fdf7f2
TL
1341
1342 return set_cr_done();
1343 }
1344
1345 return 0;
1346 }
1347};
1348
1349
1350class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
9f95a23c 1351 RGWDataSyncCtx *sc;
11fdf7f2
TL
1352 RGWRESTConn *dest_conn;
1353 const rgw_obj dest_obj;
1354 const rgw_raw_obj status_obj;
1355
1356 string upload_id;
1357
1358public:
1359
9f95a23c 1360 RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1361 RGWRESTConn *_dest_conn,
1362 const rgw_obj& _dest_obj,
1363 const rgw_raw_obj& _status_obj,
9f95a23c 1364 const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc),
11fdf7f2
TL
1365 dest_conn(_dest_conn),
1366 dest_obj(_dest_obj),
1367 status_obj(_status_obj),
1368 upload_id(_upload_id) {}
1369
1370 int operate() override {
1371 reenter(this) {
9f95a23c 1372 yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id));
11fdf7f2 1373 if (retcode < 0) {
9f95a23c 1374 ldout(sc->cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
11fdf7f2
TL
1375 /* ignore error, best effort */
1376 }
9f95a23c 1377 yield call(new RGWRadosRemoveCR(sc->env->store, status_obj));
11fdf7f2 1378 if (retcode < 0) {
9f95a23c 1379 ldout(sc->cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
11fdf7f2
TL
1380 /* ignore error, best effort */
1381 }
1382 return set_cr_done();
1383 }
1384
1385 return 0;
1386 }
1387};
1388
1389class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
9f95a23c 1390 RGWDataSyncCtx *sc;
11fdf7f2
TL
1391 RGWDataSyncEnv *sync_env;
1392 AWSSyncConfig& conf;
1393 RGWRESTConn *source_conn;
1394 std::shared_ptr<AWSSyncConfig_Profile> target;
1395 rgw_obj src_obj;
1396 rgw_obj dest_obj;
1397
1398 uint64_t obj_size;
1399 string src_etag;
1400 rgw_sync_aws_src_obj_properties src_properties;
1401 rgw_rest_obj rest_obj;
1402
1403 rgw_sync_aws_multipart_upload_info status;
1404
1405 map<string, string> new_attrs;
1406
1407 rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
1408
1409 int ret_err{0};
1410
1411 rgw_raw_obj status_obj;
1412
1413public:
9f95a23c
TL
1414 RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc,
1415 rgw_bucket_sync_pipe& _sync_pipe,
11fdf7f2
TL
1416 AWSSyncConfig& _conf,
1417 RGWRESTConn *_source_conn,
1418 const rgw_obj& _src_obj,
1419 std::shared_ptr<AWSSyncConfig_Profile>& _target,
1420 const rgw_obj& _dest_obj,
1421 uint64_t _obj_size,
1422 const rgw_sync_aws_src_obj_properties& _src_properties,
9f95a23c
TL
1423 const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct),
1424 sc(_sc),
1425 sync_env(_sc->env),
11fdf7f2
TL
1426 conf(_conf),
1427 source_conn(_source_conn),
1428 target(_target),
1429 src_obj(_src_obj),
1430 dest_obj(_dest_obj),
1431 obj_size(_obj_size),
1432 src_properties(_src_properties),
1433 rest_obj(_rest_obj),
9f95a23c
TL
1434 status_obj(sync_env->svc->zone->get_zone_params().log_pool,
1435 RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) {
11fdf7f2
TL
1436 }
1437
1438
1439 int operate() override {
1440 reenter(this) {
9f95a23c 1441 yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(sync_env->async_rados, sync_env->svc->sysobj,
11fdf7f2
TL
1442 status_obj, &status, false));
1443
1444 if (retcode < 0 && retcode != -ENOENT) {
9f95a23c 1445 ldout(sc->cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
11fdf7f2
TL
1446 return retcode;
1447 }
1448
1449 if (retcode >= 0) {
1450 /* check here that mtime and size did not change */
1451
1452 if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
1453 status.src_properties.etag != src_properties.etag) {
9f95a23c 1454 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
11fdf7f2
TL
1455 retcode = -ENOENT;
1456 }
1457 }
1458
1459 if (retcode == -ENOENT) {
9f95a23c 1460 RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
11fdf7f2 1461
9f95a23c 1462 yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id));
11fdf7f2
TL
1463 if (retcode < 0) {
1464 return set_cr_error(retcode);
1465 }
1466
1467 status.obj_size = obj_size;
1468 status.src_properties = src_properties;
1469#define MULTIPART_MAX_PARTS 10000
1470 uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
1471 status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
1472 status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
1473 status.cur_part = 1;
1474 }
1475
1476 for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
1477 yield {
1478 rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
1479 cur_part_info.part_num = status.cur_part;
1480 cur_part_info.ofs = status.cur_ofs;
1481 cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
1482
1483 pcur_part_info = &cur_part_info;
1484
1485 status.cur_ofs += status.part_size;
1486
9f95a23c 1487 call(new RGWAWSStreamObjToCloudMultipartPartCR(sc,
11fdf7f2
TL
1488 source_conn, src_obj,
1489 target,
1490 dest_obj,
1491 status.src_properties,
1492 status.upload_id,
1493 cur_part_info,
1494 &cur_part_info.etag));
1495 }
1496
1497 if (retcode < 0) {
9f95a23c 1498 ldout(sc->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
11fdf7f2 1499 ret_err = retcode;
9f95a23c 1500 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
11fdf7f2
TL
1501 return set_cr_error(ret_err);
1502 }
1503
9f95a23c 1504 yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(sync_env->async_rados, sync_env->svc->sysobj, status_obj, status));
11fdf7f2 1505 if (retcode < 0) {
9f95a23c 1506 ldout(sc->cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
11fdf7f2
TL
1507 /* continue with upload anyway */
1508 }
9f95a23c 1509 ldout(sc->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
11fdf7f2
TL
1510 }
1511
9f95a23c 1512 yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts));
11fdf7f2 1513 if (retcode < 0) {
9f95a23c 1514 ldout(sc->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
11fdf7f2 1515 ret_err = retcode;
9f95a23c 1516 yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
11fdf7f2
TL
1517 return set_cr_error(ret_err);
1518 }
1519
1520 /* remove status obj */
1521 yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
1522 if (retcode < 0) {
9f95a23c 1523 ldout(sc->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
11fdf7f2
TL
1524 /* ignore error, best effort */
1525 }
1526 return set_cr_done();
1527 }
1528
1529 return 0;
1530 }
1531};
1532template <class T>
1533int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
1534{
1535 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1536 if (iter == attrs.end()) {
1537 *result = def_val;
1538 return 0;
1539 }
1540 bufferlist& bl = iter->second;
1541 if (bl.length() == 0) {
1542 *result = def_val;
1543 return 0;
1544 }
1545 auto bliter = bl.cbegin();
1546 try {
1547 decode(*result, bliter);
1548 } catch (buffer::error& err) {
1549 return -EIO;
1550 }
1551 return 0;
1552}
1553
1554// maybe use Fetch Remote Obj instead?
1555class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
9f95a23c 1556 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1557 AWSSyncInstanceEnv& instance;
1558
1559 uint64_t versioned_epoch{0};
1560
1561 RGWRESTConn *source_conn{nullptr};
1562 std::shared_ptr<AWSSyncConfig_Profile> target;
1563 bufferlist res;
1564 unordered_map <string, bool> bucket_created;
1565 string target_bucket_name;
1566 string target_obj_name;
1567 rgw_rest_obj rest_obj;
1568 int ret{0};
1569
1570 uint32_t src_zone_short_id{0};
1571 uint64_t src_pg_ver{0};
1572
1573 bufferlist out_bl;
1574
1575 struct CreateBucketResult {
1576 string code;
1577
1578 void decode_xml(XMLObj *obj) {
1579 RGWXMLDecoder::decode_xml("Code", code, obj);
1580 }
1581 } result;
1582
1583public:
9f95a23c
TL
1584 RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1585 rgw_bucket_sync_pipe& _sync_pipe,
11fdf7f2
TL
1586 rgw_obj_key& _key,
1587 AWSSyncInstanceEnv& _instance,
9f95a23c
TL
1588 uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1589 sync_pipe(_sync_pipe),
11fdf7f2
TL
1590 instance(_instance), versioned_epoch(_versioned_epoch)
1591 {}
1592
1593 ~RGWAWSHandleRemoteObjCBCR(){
1594 }
1595
1596 int operate() override {
1597 reenter(this) {
1598 ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
1599 if (ret < 0) {
9f95a23c 1600 ldout(sc->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
11fdf7f2
TL
1601 } else {
1602 ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
1603 if (ret < 0) {
9f95a23c 1604 ldout(sc->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
11fdf7f2
TL
1605 src_pg_ver = 0; /* all or nothing */
1606 }
1607 }
9f95a23c
TL
1608 ldout(sc->cct, 4) << "AWS: download begin: z=" << sc->source_zone
1609 << " b=" << src_bucket << " k=" << key << " size=" << size
11fdf7f2
TL
1610 << " mtime=" << mtime << " etag=" << etag
1611 << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
11fdf7f2
TL
1612 << dendl;
1613
9f95a23c 1614 source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
11fdf7f2 1615 if (!source_conn) {
9f95a23c 1616 ldout(sc->cct, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl;
11fdf7f2
TL
1617 return set_cr_error(-EINVAL);
1618 }
1619
9f95a23c
TL
1620 instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
1621 instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
11fdf7f2
TL
1622
1623 if (bucket_created.find(target_bucket_name) == bucket_created.end()){
1624 yield {
9f95a23c 1625 ldout(sc->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl;
11fdf7f2 1626 bufferlist bl;
9f95a23c 1627 call(new RGWPutRawRESTResourceCR <bufferlist> (sc->cct, target->conn.get(),
11fdf7f2
TL
1628 sync_env->http_manager,
1629 target_bucket_name, nullptr, bl, &out_bl));
1630 }
1631 if (retcode < 0 ) {
1632 RGWXMLDecoder::XMLParser parser;
1633 if (!parser.init()) {
9f95a23c 1634 ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
11fdf7f2
TL
1635 return set_cr_error(retcode);
1636 }
1637
1638 if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
1639 string str(out_bl.c_str(), out_bl.length());
9f95a23c 1640 ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
11fdf7f2
TL
1641 return set_cr_error(retcode);
1642 }
1643
1644 try {
1645 RGWXMLDecoder::decode_xml("Error", result, &parser, true);
1646 } catch (RGWXMLDecoder::err& err) {
1647 string str(out_bl.c_str(), out_bl.length());
9f95a23c 1648 ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
11fdf7f2
TL
1649 return set_cr_error(retcode);
1650 }
1651
1652 if (result.code != "BucketAlreadyOwnedByYou") {
1653 return set_cr_error(retcode);
1654 }
1655 }
1656
1657 bucket_created[target_bucket_name] = true;
1658 }
1659
1660 yield {
9f95a23c 1661 rgw_obj src_obj(src_bucket, key);
11fdf7f2
TL
1662
1663 /* init output */
1664 rgw_bucket target_bucket;
1665 target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
1666 uri resolution */
1667 rgw_obj dest_obj(target_bucket, target_obj_name);
1668
1669
1670 rgw_sync_aws_src_obj_properties src_properties;
1671 src_properties.mtime = mtime;
1672 src_properties.etag = etag;
1673 src_properties.zone_short_id = src_zone_short_id;
1674 src_properties.pg_ver = src_pg_ver;
1675 src_properties.versioned_epoch = versioned_epoch;
1676
1677 if (size < instance.conf.s3.multipart_sync_threshold) {
9f95a23c 1678 call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, src_obj,
11fdf7f2
TL
1679 src_properties,
1680 target,
1681 dest_obj));
1682 } else {
1683 rgw_rest_obj rest_obj;
1684 rest_obj.init(key);
9f95a23c
TL
1685 if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) {
1686 ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
11fdf7f2
TL
1687 return set_cr_error(-EINVAL);
1688 }
9f95a23c 1689 call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, src_obj,
11fdf7f2
TL
1690 target, dest_obj, size, src_properties, rest_obj));
1691 }
1692 }
1693 if (retcode < 0) {
1694 return set_cr_error(retcode);
1695 }
1696
1697 return set_cr_done();
1698 }
1699
1700 return 0;
1701 }
1702};
1703
1704class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
9f95a23c 1705 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1706 AWSSyncInstanceEnv& instance;
1707 uint64_t versioned_epoch;
1708public:
9f95a23c
TL
1709 RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1710 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1711 AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1712 sync_pipe(_sync_pipe),
11fdf7f2
TL
1713 instance(_instance), versioned_epoch(_versioned_epoch) {
1714 }
1715
1716 ~RGWAWSHandleRemoteObjCR() {}
1717
1718 RGWStatRemoteObjCBCR *allocate_callback() override {
9f95a23c 1719 return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch);
11fdf7f2
TL
1720 }
1721};
1722
1723class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
9f95a23c 1724 RGWDataSyncCtx *sc;
11fdf7f2 1725 std::shared_ptr<AWSSyncConfig_Profile> target;
9f95a23c 1726 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1727 rgw_obj_key key;
1728 ceph::real_time mtime;
1729 AWSSyncInstanceEnv& instance;
1730 int ret{0};
1731public:
9f95a23c
TL
1732 RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
1733 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1734 AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc),
1735 sync_pipe(_sync_pipe), key(_key),
11fdf7f2
TL
1736 mtime(_mtime), instance(_instance) {}
1737 int operate() override {
1738 reenter(this) {
9f95a23c
TL
1739 ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone
1740 << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
11fdf7f2 1741 yield {
9f95a23c
TL
1742 instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
1743 string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
1744 ldout(sc->cct, 0) << "AWS: removing aws object at" << path << dendl;
11fdf7f2 1745
9f95a23c
TL
1746 call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(),
1747 sc->env->http_manager,
11fdf7f2
TL
1748 path, nullptr /* params */));
1749 }
1750 if (retcode < 0) {
1751 return set_cr_error(retcode);
1752 }
1753 return set_cr_done();
1754 }
1755 return 0;
1756 }
1757
1758};
1759
1760
1761class RGWAWSDataSyncModule: public RGWDataSyncModule {
1762 CephContext *cct;
1763 AWSSyncInstanceEnv instance;
1764public:
1765 RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) :
1766 cct(_cct),
1767 instance(_conf) {
1768 }
1769
9f95a23c
TL
1770 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1771 instance.init(sc, instance_id);
11fdf7f2
TL
1772 }
1773
1774 ~RGWAWSDataSyncModule() {}
1775
9f95a23c 1776 RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
11fdf7f2
TL
1777 std::optional<uint64_t> versioned_epoch,
1778 rgw_zone_set *zones_trace) override {
9f95a23c
TL
1779 ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1780 return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
11fdf7f2 1781 }
9f95a23c 1782 RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
11fdf7f2 1783 rgw_zone_set *zones_trace) override {
9f95a23c
TL
1784 ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1785 return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
11fdf7f2 1786 }
9f95a23c 1787 RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
11fdf7f2
TL
1788 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
1789 rgw_zone_set *zones_trace) override {
9f95a23c 1790 ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
11fdf7f2
TL
1791 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1792 return NULL;
1793 }
1794};
1795
1796class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance {
1797 RGWAWSDataSyncModule data_handler;
1798public:
1799 RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
1800 RGWDataSyncModule *get_data_handler() override {
1801 return &data_handler;
1802 }
1803};
1804
1805int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
1806 AWSSyncConfig conf;
1807
1808 int r = conf.init(cct, config);
1809 if (r < 0) {
1810 return r;
1811 }
1812
1813 instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
1814 return 0;
1815}