]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_bucket_sync.cc
377bd8f0566809f3181db1bba832103b3b4f27c3
[ceph.git] / ceph / src / rgw / rgw_bucket_sync.cc
1
2
3 #include "rgw_common.h"
4 #include "rgw_bucket_sync.h"
5 #include "rgw_data_sync.h"
6 #include "rgw_zone.h"
7
8 #include "services/svc_zone.h"
9 #include "services/svc_bucket_sync.h"
10
11 #define dout_subsys ceph_subsys_rgw
12
13
14 ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
15 os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or(rgw_zone_id()) << ",az=" << (int)e.all_zones << "}";
16 return os;
17 }
18
19 ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
20 os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
21 return os;
22 }
23
24 ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
25 os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<rgw_zone_id>()) << "}";
26 return os;
27 }
28
29 ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
30 os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
31 return os;
32 }
33
34 static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
35 const rgw_zone_id& source_zone,
36 const rgw_zone_id& dest_zone)
37 {
38 std::vector<rgw_sync_bucket_pipe> relevant_pipes;
39 for (auto& p : pipes) {
40 if (p.source.match_zone(source_zone) &&
41 p.dest.match_zone(dest_zone)) {
42 for (auto pipe : p.expand()) {
43 pipe.source.apply_zone(source_zone);
44 pipe.dest.apply_zone(dest_zone);
45 relevant_pipes.push_back(pipe);
46 }
47 }
48 }
49
50 return relevant_pipes;
51 }
52
53 static bool is_wildcard_bucket(const rgw_bucket& bucket)
54 {
55 return bucket.name.empty();
56 }
57
58 void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const
59 {
60 encode_json("zone", zone.id, f);
61 encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f);
62 encode_json("sources", sources, f);
63 encode_json("dests", dests, f);
64 }
65
66
67 template <typename CB1, typename CB2>
68 void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id& source_zone,
69 const rgw_zone_id& dest_zone,
70 const std::vector<rgw_sync_bucket_pipes>& pipes,
71 zb_pipe_map_t *pipe_map,
72 CB1 filter_cb,
73 CB2 call_filter_cb)
74 {
75 if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
76 return;
77 }
78 auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
79
80 for (auto& pipe : relevant_pipes) {
81 rgw_sync_bucket_entity zb;
82 if (!call_filter_cb(pipe, &zb)) {
83 continue;
84 }
85 pipe_map->insert(make_pair(zb, pipe));
86 }
87 }
88
89 template <typename CB>
90 void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id& source_zone,
91 const rgw_zone_id& dest_zone,
92 const std::vector<rgw_sync_bucket_pipes>& pipes,
93 CB filter_cb)
94 {
95 return try_add_to_pipe_map(source_zone, dest_zone, pipes,
96 &sources,
97 filter_cb,
98 [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) {
99 *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()};
100 return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
101 });
102 }
103
104 template <typename CB>
105 void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id& source_zone,
106 const rgw_zone_id& dest_zone,
107 const std::vector<rgw_sync_bucket_pipes>& pipes,
108 CB filter_cb)
109 {
110 return try_add_to_pipe_map(source_zone, dest_zone, pipes,
111 &dests,
112 filter_cb,
113 [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) {
114 *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()};
115 return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
116 });
117 }
118
119 using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t;
120
121 pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m,
122 const rgw_zone_id& zone,
123 std::optional<rgw_bucket> b) const
124 {
125 if (!b) {
126 return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()});
127 }
128
129 auto zb = rgw_sync_bucket_entity{zone, *b};
130
131 auto range = m.equal_range(zb);
132 if (range.first == range.second &&
133 !is_wildcard_bucket(*b)) {
134 /* couldn't find the specific bucket, try to find by wildcard */
135 zb.bucket = rgw_bucket();
136 range = m.equal_range(zb);
137 }
138
139 return range;
140 }
141
142
143 template <typename CB>
144 void rgw_sync_group_pipe_map::init(CephContext *cct,
145 const rgw_zone_id& _zone,
146 std::optional<rgw_bucket> _bucket,
147 const rgw_sync_policy_group& group,
148 rgw_sync_data_flow_group *_default_flow,
149 std::set<rgw_zone_id> *_pall_zones,
150 CB filter_cb) {
151 zone = _zone;
152 bucket = _bucket;
153 default_flow = _default_flow;
154 pall_zones = _pall_zones;
155
156 rgw_sync_bucket_entity zb(zone, bucket);
157
158 status = group.status;
159
160 std::vector<rgw_sync_bucket_pipes> zone_pipes;
161
162 string bucket_key = (bucket ? bucket->get_key() : "*");
163
164 /* only look at pipes that touch the specific zone and bucket */
165 for (auto& pipe : group.pipes) {
166 if (pipe.contains_zone_bucket(zone, bucket)) {
167 ldout(cct, 20) << __func__ << "(): pipe_map (zone=" << zone << " bucket=" << bucket_key << "): adding potential pipe: " << pipe << dendl;
168 zone_pipes.push_back(pipe);
169 }
170 }
171
172 const rgw_sync_data_flow_group *pflow;
173
174 if (!group.data_flow.empty()) {
175 pflow = &group.data_flow;
176 } else {
177 if (!default_flow) {
178 return;
179 }
180 pflow = default_flow;
181 }
182
183 auto& flow = *pflow;
184
185 pall_zones->insert(zone);
186
187 /* symmetrical */
188 for (auto& symmetrical_group : flow.symmetrical) {
189 if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
190 for (auto& z : symmetrical_group.zones) {
191 if (z != zone) {
192 pall_zones->insert(z);
193 try_add_source(z, zone, zone_pipes, filter_cb);
194 try_add_dest(zone, z, zone_pipes, filter_cb);
195 }
196 }
197 }
198 }
199
200 /* directional */
201 for (auto& rule : flow.directional) {
202 if (rule.source_zone == zone) {
203 pall_zones->insert(rule.dest_zone);
204 try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
205 } else if (rule.dest_zone == zone) {
206 pall_zones->insert(rule.source_zone);
207 try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
208 }
209 }
210 }
211
212 /*
213 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
214 */
215 vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id& source_zone,
216 std::optional<rgw_bucket> source_bucket,
217 std::optional<rgw_bucket> dest_bucket) const {
218 vector<rgw_sync_bucket_pipe> result;
219
220 auto range = find_pipes(sources, source_zone, source_bucket);
221
222 for (auto iter = range.first; iter != range.second; ++iter) {
223 auto pipe = iter->second;
224 if (pipe.dest.match_bucket(dest_bucket)) {
225 result.push_back(pipe);
226 }
227 }
228 return result;
229 }
230
231 /*
232 * find all relevant pipes in other zones that pull from a specific
233 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
234 */
235 vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket,
236 const rgw_zone_id& dest_zone,
237 std::optional<rgw_bucket> dest_bucket) const {
238 vector<rgw_sync_bucket_pipe> result;
239
240 auto range = find_pipes(dests, dest_zone, dest_bucket);
241
242 for (auto iter = range.first; iter != range.second; ++iter) {
243 auto pipe = iter->second;
244 if (pipe.source.match_bucket(source_bucket)) {
245 result.push_back(pipe);
246 }
247 }
248
249 return result;
250 }
251
252 /*
253 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
254 */
255 vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id& source_zone,
256 std::optional<rgw_bucket> source_bucket,
257 const rgw_zone_id& dest_zone,
258 std::optional<rgw_bucket> dest_bucket) const {
259 if (dest_zone == zone) {
260 return find_source_pipes(source_zone, source_bucket, dest_bucket);
261 }
262
263 if (source_zone == zone) {
264 return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
265 }
266
267 return vector<rgw_sync_bucket_pipe>();
268 }
269
270 void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
271 {
272 pipes.push_back(pipe);
273
274 auto ppipe = &pipes.back();
275 auto prefix = ppipe->params.source.filter.prefix.value_or(string());
276
277 prefix_refs.insert(make_pair(prefix, ppipe));
278
279 for (auto& t : ppipe->params.source.filter.tags) {
280 string tag = t.key + "=" + t.value;
281 auto titer = tag_refs.find(tag);
282 if (titer != tag_refs.end() &&
283 ppipe->params.priority > titer->second->params.priority) {
284 titer->second = ppipe;
285 } else {
286 tag_refs[tag] = ppipe;
287 }
288 }
289 }
290
291 bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
292 std::optional<rgw_user> *user,
293 std::optional<rgw_user> *acl_translation_owner,
294 std::optional<string> *storage_class,
295 rgw_sync_pipe_params::Mode *mode,
296 bool *need_more_info) const
297 {
298 std::optional<string> owner;
299
300 *need_more_info = false;
301
302 if (prefix_refs.empty()) {
303 return false;
304 }
305
306 auto end = prefix_refs.upper_bound(key.name);
307 auto iter = end;
308 if (iter != prefix_refs.begin()) {
309 --iter;
310 }
311 if (iter == prefix_refs.end()) {
312 return false;
313 }
314
315 if (iter != prefix_refs.begin()) {
316 iter = prefix_refs.find(iter->first); /* prefix_refs is multimap, find first element
317 holding that key */
318 }
319
320 std::vector<decltype(iter)> iters;
321
322 std::optional<int> priority;
323
324 for (; iter != end; ++iter) {
325 auto& prefix = iter->first;
326 if (!boost::starts_with(key.name, prefix)) {
327 continue;
328 }
329
330 auto& rule_params = iter->second->params;
331 auto& filter = rule_params.source.filter;
332
333 if (rule_params.priority > priority) {
334 priority = rule_params.priority;
335
336 if (!filter.has_tags()) {
337 iters.clear();
338 }
339 iters.push_back(iter);
340
341 *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then
342 we can't be sure if it would be used.
343 We need to first read the info from the source object */
344 }
345 }
346
347 if (iters.empty()) {
348 return false;
349 }
350
351 bool conflict = false;
352
353 std::optional<rgw_user> _user;
354 std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
355 std::optional<string> _storage_class;
356 rgw_sync_pipe_params::Mode _mode;
357
358 int i = 0;
359 for (auto& iter : iters) {
360 auto& rule_params = iter->second->params;
361 if (++i == 0) {
362 _user = rule_params.user;
363 _acl_translation = rule_params.dest.acl_translation;
364 _storage_class = rule_params.dest.storage_class;
365 _mode = rule_params.mode;
366 continue;
367 }
368
369 conflict = !(_user == rule_params.user &&
370 _acl_translation == rule_params.dest.acl_translation &&
371 _storage_class == rule_params.dest.storage_class &&
372 _mode == rule_params.mode);
373 if (conflict) {
374 *need_more_info = true;
375 return false;
376 }
377 }
378
379 *user = _user;
380 if (_acl_translation) {
381 *acl_translation_owner = _acl_translation->owner;
382 }
383 *storage_class = _storage_class;
384 *mode = _mode;
385
386 return true;
387 }
388
389 bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
390 const RGWObjTags::tag_map_t& tags,
391 rgw_sync_pipe_params *params) const
392 {
393 if (prefix_refs.empty()) {
394 return false;
395 }
396
397 auto iter = prefix_refs.upper_bound(key.name);
398 if (iter != prefix_refs.begin()) {
399 --iter;
400 }
401 if (iter == prefix_refs.end()) {
402 return false;
403 }
404
405 auto end = prefix_refs.upper_bound(key.name);
406 auto max = end;
407
408 std::optional<int> priority;
409
410 for (; iter != end; ++iter) {
411 /* NOTE: this is not the most efficient way to do it,
412 * a trie data structure would be better
413 */
414 auto& prefix = iter->first;
415 if (!boost::starts_with(key.name, prefix)) {
416 continue;
417 }
418
419 auto& rule_params = iter->second->params;
420 auto& filter = rule_params.source.filter;
421
422 if (!filter.check_tags(tags)) {
423 continue;
424 }
425
426 if (rule_params.priority > priority) {
427 priority = rule_params.priority;
428 max = iter;
429 }
430 }
431
432 if (max == end) {
433 return false;
434 }
435
436 *params = max->second->params;
437 return true;
438 }
439
440 /*
441 * return either the current prefix for s, or the next one if s is not within a prefix
442 */
443
444 RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const
445 {
446 if (prefix_refs.empty()) {
447 return prefix_refs.end();
448 }
449 auto next = prefix_refs.upper_bound(s);
450 auto iter = next;
451 if (iter != prefix_refs.begin()) {
452 --iter;
453 }
454 if (!boost::starts_with(s, iter->first)) {
455 return next;
456 }
457
458 return iter;
459 }
460
461 void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
462 pipe_map.insert(make_pair(pipe.id, pipe));
463
464 auto& rules_ref = rules[endpoints_pair(pipe)];
465
466 if (!rules_ref) {
467 rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>();
468 }
469
470 rules_ref->insert(pipe);
471
472 pipe_handler h(rules_ref, pipe);
473
474 handlers.insert(h);
475 }
476
477 void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
478 {
479 encode_json("pipes", pipe_map, f);
480 }
481
482 bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id& source_zone,
483 std::optional<rgw_bucket> source_bucket,
484 const rgw_zone_id& dest_zone,
485 std::optional<rgw_bucket> dest_bucket,
486 bool check_activated) const
487 {
488 bool found = false;
489 bool found_activated = false;
490
491 for (auto m : flow_groups) {
492 auto& fm = m.second;
493 auto pipes = fm.find_pipes(source_zone, source_bucket,
494 dest_zone, dest_bucket);
495
496 bool is_found = !pipes.empty();
497
498 if (is_found) {
499 switch (fm.status) {
500 case rgw_sync_policy_group::Status::FORBIDDEN:
501 return false;
502 case rgw_sync_policy_group::Status::ENABLED:
503 found = true;
504 found_activated = true;
505 break;
506 case rgw_sync_policy_group::Status::ALLOWED:
507 found = true;
508 break;
509 default:
510 break; /* unknown -- ignore */
511 }
512 }
513 }
514
515 if (check_activated && found_activated) {
516 return true;
517 }
518
519 return found;
520 }
521
522 void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
523 std::optional<rgw_sync_data_flow_group> default_flow;
524 if (parent) {
525 default_flow.emplace();
526 default_flow->init_default(parent->all_zones);
527 }
528
529 for (auto& item : sync_policy.groups) {
530 auto& group = item.second;
531 auto& flow_group_map = flow_groups[group.id];
532
533 flow_group_map.init(cct, zone_id, bucket, group,
534 (default_flow ? &(*default_flow) : nullptr),
535 &all_zones,
536 [&](const rgw_zone_id& source_zone,
537 std::optional<rgw_bucket> source_bucket,
538 const rgw_zone_id& dest_zone,
539 std::optional<rgw_bucket> dest_bucket) {
540 if (!parent) {
541 return true;
542 }
543 return parent->allowed_data_flow(source_zone,
544 source_bucket,
545 dest_zone,
546 dest_bucket,
547 false); /* just check that it's not disabled */
548 });
549 }
550 }
551
552 void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
553 RGWBucketSyncFlowManager::pipe_set *source_pipes,
554 RGWBucketSyncFlowManager::pipe_set *dest_pipes,
555 bool only_enabled) const
556
557 {
558 string effective_bucket_key;
559 if (effective_bucket) {
560 effective_bucket_key = effective_bucket->get_key();
561 }
562 if (parent) {
563 parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled);
564 }
565
566 for (auto& item : flow_groups) {
567 auto& flow_group_map = item.second;
568
569 /* only return enabled groups */
570 if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
571 (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
572 continue;
573 }
574
575 for (auto& entry : flow_group_map.sources) {
576 rgw_sync_bucket_pipe pipe = entry.second;
577 if (!pipe.dest.match_bucket(effective_bucket)) {
578 continue;
579 }
580
581 pipe.source.apply_bucket(effective_bucket);
582 pipe.dest.apply_bucket(effective_bucket);
583
584 ldout(cct, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
585 source_pipes->insert(pipe);
586 }
587
588 for (auto& entry : flow_group_map.dests) {
589 rgw_sync_bucket_pipe pipe = entry.second;
590
591 if (!pipe.source.match_bucket(effective_bucket)) {
592 continue;
593 }
594
595 pipe.source.apply_bucket(effective_bucket);
596 pipe.dest.apply_bucket(effective_bucket);
597
598 ldout(cct, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
599 dest_pipes->insert(pipe);
600 }
601 }
602 }
603
604
605 RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext *_cct,
606 const rgw_zone_id& _zone_id,
607 std::optional<rgw_bucket> _bucket,
608 const RGWBucketSyncFlowManager *_parent) : cct(_cct),
609 zone_id(_zone_id),
610 bucket(_bucket),
611 parent(_parent) {}
612
613
614 void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc,
615 RGWSI_SyncModules *sync_modules_svc,
616 rgw_sync_policy_info *ppolicy)
617 {
618 bool found = false;
619
620 rgw_sync_policy_info policy;
621
622 auto& group = policy.groups["default"];
623 auto& zonegroup = zone_svc->get_zonegroup();
624
625 for (const auto& ziter1 : zonegroup.zones) {
626 auto& id1 = ziter1.first;
627 const RGWZone& z1 = ziter1.second;
628
629 for (const auto& ziter2 : zonegroup.zones) {
630 auto& id2 = ziter2.first;
631 const RGWZone& z2 = ziter2.second;
632
633 if (id1 == id2) {
634 continue;
635 }
636
637 if (z1.syncs_from(z2.name)) {
638 found = true;
639 rgw_sync_directional_rule *rule;
640 group.data_flow.find_or_create_directional(id2,
641 id1,
642 &rule);
643 }
644 }
645 }
646
647 if (!found) { /* nothing syncs */
648 return;
649 }
650
651 rgw_sync_bucket_pipes pipes;
652 pipes.id = "all";
653 pipes.source.all_zones = true;
654 pipes.dest.all_zones = true;
655
656 group.pipes.emplace_back(std::move(pipes));
657
658
659 group.status = rgw_sync_policy_group::Status::ENABLED;
660
661 *ppolicy = std::move(policy);
662 }
663
664 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
665 RGWSI_SyncModules *sync_modules_svc,
666 RGWSI_Bucket_Sync *_bucket_sync_svc,
667 std::optional<rgw_zone_id> effective_zone) : zone_svc(_zone_svc) ,
668 bucket_sync_svc(_bucket_sync_svc) {
669 zone_id = effective_zone.value_or(zone_svc->zone_id());
670 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
671 zone_id,
672 nullopt,
673 nullptr));
674 sync_policy = zone_svc->get_zonegroup().sync_policy;
675
676 if (sync_policy.empty()) {
677 RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy);
678 legacy_config = true;
679 }
680 }
681
682 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
683 const RGWBucketInfo& _bucket_info,
684 map<string, bufferlist>&& _bucket_attrs) : parent(_parent),
685 bucket_info(_bucket_info),
686 bucket_attrs(std::move(_bucket_attrs)) {
687 if (_bucket_info.sync_policy) {
688 sync_policy = *_bucket_info.sync_policy;
689
690 for (auto& entry : sync_policy.groups) {
691 for (auto& pipe : entry.second.pipes) {
692 if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER &&
693 pipe.params.user.empty()) {
694 pipe.params.user = _bucket_info.owner;
695 }
696 }
697 }
698 }
699 legacy_config = parent->legacy_config;
700 bucket = _bucket_info.bucket;
701 zone_svc = parent->zone_svc;
702 bucket_sync_svc = parent->bucket_sync_svc;
703 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
704 parent->zone_id,
705 _bucket_info.bucket,
706 parent->flow_mgr.get()));
707 }
708
709 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
710 const rgw_bucket& _bucket,
711 std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) {
712 if (_sync_policy) {
713 sync_policy = *_sync_policy;
714 }
715 legacy_config = parent->legacy_config;
716 bucket = _bucket;
717 zone_svc = parent->zone_svc;
718 bucket_sync_svc = parent->bucket_sync_svc;
719 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
720 parent->zone_id,
721 _bucket,
722 parent->flow_mgr.get()));
723 }
724
725 RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info,
726 map<string, bufferlist>&& bucket_attrs) const
727 {
728 return new RGWBucketSyncPolicyHandler(this, bucket_info, std::move(bucket_attrs));
729 }
730
731 RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket,
732 std::optional<rgw_sync_policy_info> sync_policy) const
733 {
734 return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
735 }
736
737 int RGWBucketSyncPolicyHandler::init(optional_yield y)
738 {
739 int r = bucket_sync_svc->get_bucket_sync_hints(bucket.value_or(rgw_bucket()),
740 &source_hints,
741 &target_hints,
742 y);
743 if (r < 0) {
744 ldout(bucket_sync_svc->ctx(), 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket="
745 << bucket << " returned r=" << r << dendl;
746 return r;
747 }
748
749 flow_mgr->init(sync_policy);
750
751 reflect(&source_pipes,
752 &target_pipes,
753 &sources,
754 &targets,
755 &source_zones,
756 &target_zones,
757 true);
758
759 return 0;
760 }
761
762 void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psource_pipes,
763 RGWBucketSyncFlowManager::pipe_set *ptarget_pipes,
764 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources,
765 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets,
766 std::set<rgw_zone_id> *psource_zones,
767 std::set<rgw_zone_id> *ptarget_zones,
768 bool only_enabled) const
769 {
770 RGWBucketSyncFlowManager::pipe_set _source_pipes;
771 RGWBucketSyncFlowManager::pipe_set _target_pipes;
772 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _sources;
773 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _targets;
774 std::set<rgw_zone_id> _source_zones;
775 std::set<rgw_zone_id> _target_zones;
776
777 flow_mgr->reflect(bucket, &_source_pipes, &_target_pipes, only_enabled);
778
779 for (auto& entry : _source_pipes.pipe_map) {
780 auto& pipe = entry.second;
781 if (!pipe.source.zone) {
782 continue;
783 }
784 _source_zones.insert(*pipe.source.zone);
785 _sources[*pipe.source.zone].insert(pipe);
786 }
787
788 for (auto& entry : _target_pipes.pipe_map) {
789 auto& pipe = entry.second;
790 if (!pipe.dest.zone) {
791 continue;
792 }
793 _target_zones.insert(*pipe.dest.zone);
794 _targets[*pipe.dest.zone].insert(pipe);
795 }
796
797 if (psource_pipes) {
798 *psource_pipes = std::move(_source_pipes);
799 }
800 if (ptarget_pipes) {
801 *ptarget_pipes = std::move(_target_pipes);
802 }
803 if (psources) {
804 *psources = std::move(_sources);
805 }
806 if (ptargets) {
807 *ptargets = std::move(_targets);
808 }
809 if (psource_zones) {
810 *psource_zones = std::move(_source_zones);
811 }
812 if (ptarget_zones) {
813 *ptarget_zones = std::move(_target_zones);
814 }
815 }
816
817 multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const
818 {
819 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
820
821 for (auto& source_entry : sources) {
822 auto& zone_id = source_entry.first;
823
824 auto& pipes = source_entry.second.pipe_map;
825
826 for (auto& entry : pipes) {
827 auto& pipe = entry.second;
828 m.insert(make_pair(zone_id, pipe));
829 }
830 }
831
832 for (auto& pipe : resolved_sources) {
833 if (!pipe.source.zone) {
834 continue;
835 }
836
837 m.insert(make_pair(*pipe.source.zone, pipe));
838 }
839
840 return m;
841 }
842
843 multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const
844 {
845 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
846
847 for (auto& dest_entry : targets) {
848 auto& zone_id = dest_entry.first;
849
850 auto& pipes = dest_entry.second.pipe_map;
851
852 for (auto& entry : pipes) {
853 auto& pipe = entry.second;
854 m.insert(make_pair(zone_id, pipe));
855 }
856 }
857
858 for (auto& pipe : resolved_dests) {
859 if (!pipe.dest.zone) {
860 continue;
861 }
862
863 m.insert(make_pair(*pipe.dest.zone, pipe));
864 }
865
866 return m;
867 }
868
869 multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const
870 {
871 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
872
873 auto iter = targets.find(zone_id);
874 if (iter != targets.end()) {
875 auto& pipes = iter->second.pipe_map;
876
877 for (auto& entry : pipes) {
878 auto& pipe = entry.second;
879 m.insert(make_pair(zone_id, pipe));
880 }
881 }
882
883 for (auto& pipe : resolved_dests) {
884 if (!pipe.dest.zone ||
885 *pipe.dest.zone != zone_id) {
886 continue;
887 }
888
889 m.insert(make_pair(*pipe.dest.zone, pipe));
890 }
891
892 return m;
893 }
894
895 void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
896 std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */
897 for (auto& entry : source_pipes.pipe_map) {
898 auto& source_pipe = entry.second;
899 if (!filter_peer ||
900 source_pipe.source.match(*filter_peer)) {
901 _sources->insert(source_pipe);
902 }
903 }
904
905 for (auto& entry : target_pipes.pipe_map) {
906 auto& target_pipe = entry.second;
907 if (!filter_peer ||
908 target_pipe.dest.match(*filter_peer)) {
909 _targets->insert(target_pipe);
910 }
911 }
912 }
913
914 bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
915 {
916 if (!bucket) {
917 return false;
918 }
919
920 if (bucket_is_sync_source()) {
921 return true;
922 }
923
924 return (zone_svc->need_to_log_data() &&
925 bucket_info->datasync_flag_enabled());
926 }
927
928 bool RGWBucketSyncPolicyHandler::bucket_imports_data() const
929 {
930 return bucket_is_sync_target();
931 }
932