3 #include "rgw_common.h"
4 #include "rgw_bucket_sync.h"
5 #include "rgw_data_sync.h"
8 #include "services/svc_zone.h"
9 #include "services/svc_bucket_sync.h"
11 #define dout_subsys ceph_subsys_rgw
14 ostream
& operator<<(ostream
& os
, const rgw_sync_bucket_entity
& e
) {
15 os
<< "{b=" << rgw_sync_bucket_entities::bucket_key(e
.bucket
) << ",z=" << e
.zone
.value_or(rgw_zone_id()) << ",az=" << (int)e
.all_zones
<< "}";
19 ostream
& operator<<(ostream
& os
, const rgw_sync_bucket_pipe
& pipe
) {
20 os
<< "{s=" << pipe
.source
<< ",d=" << pipe
.dest
<< "}";
24 ostream
& operator<<(ostream
& os
, const rgw_sync_bucket_entities
& e
) {
25 os
<< "{b=" << rgw_sync_bucket_entities::bucket_key(e
.bucket
) << ",z=" << e
.zones
.value_or(std::set
<rgw_zone_id
>()) << "}";
29 ostream
& operator<<(ostream
& os
, const rgw_sync_bucket_pipes
& pipe
) {
30 os
<< "{id=" << pipe
.id
<< ",s=" << pipe
.source
<< ",d=" << pipe
.dest
<< "}";
34 static std::vector
<rgw_sync_bucket_pipe
> filter_relevant_pipes(const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
35 const rgw_zone_id
& source_zone
,
36 const rgw_zone_id
& dest_zone
)
38 std::vector
<rgw_sync_bucket_pipe
> relevant_pipes
;
39 for (auto& p
: pipes
) {
40 if (p
.source
.match_zone(source_zone
) &&
41 p
.dest
.match_zone(dest_zone
)) {
42 for (auto pipe
: p
.expand()) {
43 pipe
.source
.apply_zone(source_zone
);
44 pipe
.dest
.apply_zone(dest_zone
);
45 relevant_pipes
.push_back(pipe
);
50 return relevant_pipes
;
53 static bool is_wildcard_bucket(const rgw_bucket
& bucket
)
55 return bucket
.name
.empty();
58 void rgw_sync_group_pipe_map::dump(ceph::Formatter
*f
) const
60 encode_json("zone", zone
.id
, f
);
61 encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket
), f
);
62 encode_json("sources", sources
, f
);
63 encode_json("dests", dests
, f
);
67 template <typename CB1
, typename CB2
>
68 void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id
& source_zone
,
69 const rgw_zone_id
& dest_zone
,
70 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
71 zb_pipe_map_t
*pipe_map
,
75 if (!filter_cb(source_zone
, nullopt
, dest_zone
, nullopt
)) {
78 auto relevant_pipes
= filter_relevant_pipes(pipes
, source_zone
, dest_zone
);
80 for (auto& pipe
: relevant_pipes
) {
81 rgw_sync_bucket_entity zb
;
82 if (!call_filter_cb(pipe
, &zb
)) {
85 pipe_map
->insert(make_pair(zb
, pipe
));
89 template <typename CB
>
90 void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id
& source_zone
,
91 const rgw_zone_id
& dest_zone
,
92 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
95 return try_add_to_pipe_map(source_zone
, dest_zone
, pipes
,
98 [&](const rgw_sync_bucket_pipe
& pipe
, rgw_sync_bucket_entity
*zb
) {
99 *zb
= rgw_sync_bucket_entity
{source_zone
, pipe
.source
.get_bucket()};
100 return filter_cb(source_zone
, zb
->bucket
, dest_zone
, pipe
.dest
.get_bucket());
104 template <typename CB
>
105 void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id
& source_zone
,
106 const rgw_zone_id
& dest_zone
,
107 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
110 return try_add_to_pipe_map(source_zone
, dest_zone
, pipes
,
113 [&](const rgw_sync_bucket_pipe
& pipe
, rgw_sync_bucket_entity
*zb
) {
114 *zb
= rgw_sync_bucket_entity
{dest_zone
, pipe
.dest
.get_bucket()};
115 return filter_cb(source_zone
, pipe
.source
.get_bucket(), dest_zone
, zb
->bucket
);
119 using zb_pipe_map_t
= rgw_sync_group_pipe_map::zb_pipe_map_t
;
121 pair
<zb_pipe_map_t::const_iterator
, zb_pipe_map_t::const_iterator
> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t
& m
,
122 const rgw_zone_id
& zone
,
123 std::optional
<rgw_bucket
> b
) const
126 return m
.equal_range(rgw_sync_bucket_entity
{zone
, rgw_bucket()});
129 auto zb
= rgw_sync_bucket_entity
{zone
, *b
};
131 auto range
= m
.equal_range(zb
);
132 if (range
.first
== range
.second
&&
133 !is_wildcard_bucket(*b
)) {
134 /* couldn't find the specific bucket, try to find by wildcard */
135 zb
.bucket
= rgw_bucket();
136 range
= m
.equal_range(zb
);
143 template <typename CB
>
144 void rgw_sync_group_pipe_map::init(CephContext
*cct
,
145 const rgw_zone_id
& _zone
,
146 std::optional
<rgw_bucket
> _bucket
,
147 const rgw_sync_policy_group
& group
,
148 rgw_sync_data_flow_group
*_default_flow
,
149 std::set
<rgw_zone_id
> *_pall_zones
,
153 default_flow
= _default_flow
;
154 pall_zones
= _pall_zones
;
156 rgw_sync_bucket_entity
zb(zone
, bucket
);
158 status
= group
.status
;
160 std::vector
<rgw_sync_bucket_pipes
> zone_pipes
;
162 string bucket_key
= (bucket
? bucket
->get_key() : "*");
164 /* only look at pipes that touch the specific zone and bucket */
165 for (auto& pipe
: group
.pipes
) {
166 if (pipe
.contains_zone_bucket(zone
, bucket
)) {
167 ldout(cct
, 20) << __func__
<< "(): pipe_map (zone=" << zone
<< " bucket=" << bucket_key
<< "): adding potential pipe: " << pipe
<< dendl
;
168 zone_pipes
.push_back(pipe
);
172 const rgw_sync_data_flow_group
*pflow
;
174 if (!group
.data_flow
.empty()) {
175 pflow
= &group
.data_flow
;
180 pflow
= default_flow
;
185 pall_zones
->insert(zone
);
188 for (auto& symmetrical_group
: flow
.symmetrical
) {
189 if (symmetrical_group
.zones
.find(zone
) != symmetrical_group
.zones
.end()) {
190 for (auto& z
: symmetrical_group
.zones
) {
192 pall_zones
->insert(z
);
193 try_add_source(z
, zone
, zone_pipes
, filter_cb
);
194 try_add_dest(zone
, z
, zone_pipes
, filter_cb
);
201 for (auto& rule
: flow
.directional
) {
202 if (rule
.source_zone
== zone
) {
203 pall_zones
->insert(rule
.dest_zone
);
204 try_add_dest(zone
, rule
.dest_zone
, zone_pipes
, filter_cb
);
205 } else if (rule
.dest_zone
== zone
) {
206 pall_zones
->insert(rule
.source_zone
);
207 try_add_source(rule
.source_zone
, zone
, zone_pipes
, filter_cb
);
213 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
215 vector
<rgw_sync_bucket_pipe
> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id
& source_zone
,
216 std::optional
<rgw_bucket
> source_bucket
,
217 std::optional
<rgw_bucket
> dest_bucket
) const {
218 vector
<rgw_sync_bucket_pipe
> result
;
220 auto range
= find_pipes(sources
, source_zone
, source_bucket
);
222 for (auto iter
= range
.first
; iter
!= range
.second
; ++iter
) {
223 auto pipe
= iter
->second
;
224 if (pipe
.dest
.match_bucket(dest_bucket
)) {
225 result
.push_back(pipe
);
232 * find all relevant pipes in other zones that pull from a specific
233 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
235 vector
<rgw_sync_bucket_pipe
> rgw_sync_group_pipe_map::find_dest_pipes(std::optional
<rgw_bucket
> source_bucket
,
236 const rgw_zone_id
& dest_zone
,
237 std::optional
<rgw_bucket
> dest_bucket
) const {
238 vector
<rgw_sync_bucket_pipe
> result
;
240 auto range
= find_pipes(dests
, dest_zone
, dest_bucket
);
242 for (auto iter
= range
.first
; iter
!= range
.second
; ++iter
) {
243 auto pipe
= iter
->second
;
244 if (pipe
.source
.match_bucket(source_bucket
)) {
245 result
.push_back(pipe
);
253 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
255 vector
<rgw_sync_bucket_pipe
> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id
& source_zone
,
256 std::optional
<rgw_bucket
> source_bucket
,
257 const rgw_zone_id
& dest_zone
,
258 std::optional
<rgw_bucket
> dest_bucket
) const {
259 if (dest_zone
== zone
) {
260 return find_source_pipes(source_zone
, source_bucket
, dest_bucket
);
263 if (source_zone
== zone
) {
264 return find_dest_pipes(source_bucket
, dest_zone
, dest_bucket
);
267 return vector
<rgw_sync_bucket_pipe
>();
270 void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe
& pipe
)
272 pipes
.push_back(pipe
);
274 auto ppipe
= &pipes
.back();
275 auto prefix
= ppipe
->params
.source
.filter
.prefix
.value_or(string());
277 prefix_refs
.insert(make_pair(prefix
, ppipe
));
279 for (auto& t
: ppipe
->params
.source
.filter
.tags
) {
280 string tag
= t
.key
+ "=" + t
.value
;
281 auto titer
= tag_refs
.find(tag
);
282 if (titer
!= tag_refs
.end() &&
283 ppipe
->params
.priority
> titer
->second
->params
.priority
) {
284 titer
->second
= ppipe
;
286 tag_refs
[tag
] = ppipe
;
291 bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key
& key
,
292 std::optional
<rgw_user
> *user
,
293 std::optional
<rgw_user
> *acl_translation_owner
,
294 std::optional
<string
> *storage_class
,
295 rgw_sync_pipe_params::Mode
*mode
,
296 bool *need_more_info
) const
298 std::optional
<string
> owner
;
300 *need_more_info
= false;
302 if (prefix_refs
.empty()) {
306 auto end
= prefix_refs
.upper_bound(key
.name
);
308 if (iter
!= prefix_refs
.begin()) {
311 if (iter
== prefix_refs
.end()) {
315 if (iter
!= prefix_refs
.begin()) {
316 iter
= prefix_refs
.find(iter
->first
); /* prefix_refs is multimap, find first element
320 std::vector
<decltype(iter
)> iters
;
322 std::optional
<int> priority
;
324 for (; iter
!= end
; ++iter
) {
325 auto& prefix
= iter
->first
;
326 if (!boost::starts_with(key
.name
, prefix
)) {
330 auto& rule_params
= iter
->second
->params
;
331 auto& filter
= rule_params
.source
.filter
;
333 if (rule_params
.priority
> priority
) {
334 priority
= rule_params
.priority
;
336 if (!filter
.has_tags()) {
339 iters
.push_back(iter
);
341 *need_more_info
= filter
.has_tags(); /* if highest priority filter has tags, then
342 we can't be sure if it would be used.
343 We need to first read the info from the source object */
351 bool conflict
= false;
353 std::optional
<rgw_user
> _user
;
354 std::optional
<rgw_sync_pipe_acl_translation
> _acl_translation
;
355 std::optional
<string
> _storage_class
;
356 rgw_sync_pipe_params::Mode _mode
;
359 for (auto& iter
: iters
) {
360 auto& rule_params
= iter
->second
->params
;
362 _user
= rule_params
.user
;
363 _acl_translation
= rule_params
.dest
.acl_translation
;
364 _storage_class
= rule_params
.dest
.storage_class
;
365 _mode
= rule_params
.mode
;
369 conflict
= !(_user
== rule_params
.user
&&
370 _acl_translation
== rule_params
.dest
.acl_translation
&&
371 _storage_class
== rule_params
.dest
.storage_class
&&
372 _mode
== rule_params
.mode
);
374 *need_more_info
= true;
380 if (_acl_translation
) {
381 *acl_translation_owner
= _acl_translation
->owner
;
383 *storage_class
= _storage_class
;
389 bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key
& key
,
390 const RGWObjTags::tag_map_t
& tags
,
391 rgw_sync_pipe_params
*params
) const
393 if (prefix_refs
.empty()) {
397 auto iter
= prefix_refs
.upper_bound(key
.name
);
398 if (iter
!= prefix_refs
.begin()) {
401 if (iter
== prefix_refs
.end()) {
405 auto end
= prefix_refs
.upper_bound(key
.name
);
408 std::optional
<int> priority
;
410 for (; iter
!= end
; ++iter
) {
411 /* NOTE: this is not the most efficient way to do it,
412 * a trie data structure would be better
414 auto& prefix
= iter
->first
;
415 if (!boost::starts_with(key
.name
, prefix
)) {
419 auto& rule_params
= iter
->second
->params
;
420 auto& filter
= rule_params
.source
.filter
;
422 if (!filter
.check_tags(tags
)) {
426 if (rule_params
.priority
> priority
) {
427 priority
= rule_params
.priority
;
436 *params
= max
->second
->params
;
441 * return either the current prefix for s, or the next one if s is not within a prefix
444 RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator
RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string
& s
) const
446 if (prefix_refs
.empty()) {
447 return prefix_refs
.end();
449 auto next
= prefix_refs
.upper_bound(s
);
451 if (iter
!= prefix_refs
.begin()) {
454 if (!boost::starts_with(s
, iter
->first
)) {
461 void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe
& pipe
) {
462 pipe_map
.insert(make_pair(pipe
.id
, pipe
));
464 auto& rules_ref
= rules
[endpoints_pair(pipe
)];
467 rules_ref
= make_shared
<RGWBucketSyncFlowManager::pipe_rules
>();
470 rules_ref
->insert(pipe
);
472 pipe_handler
h(rules_ref
, pipe
);
477 void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter
*f
) const
479 encode_json("pipes", pipe_map
, f
);
482 bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id
& source_zone
,
483 std::optional
<rgw_bucket
> source_bucket
,
484 const rgw_zone_id
& dest_zone
,
485 std::optional
<rgw_bucket
> dest_bucket
,
486 bool check_activated
) const
489 bool found_activated
= false;
491 for (auto m
: flow_groups
) {
493 auto pipes
= fm
.find_pipes(source_zone
, source_bucket
,
494 dest_zone
, dest_bucket
);
496 bool is_found
= !pipes
.empty();
500 case rgw_sync_policy_group::Status::FORBIDDEN
:
502 case rgw_sync_policy_group::Status::ENABLED
:
504 found_activated
= true;
506 case rgw_sync_policy_group::Status::ALLOWED
:
510 break; /* unknown -- ignore */
515 if (check_activated
&& found_activated
) {
522 void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info
& sync_policy
) {
523 std::optional
<rgw_sync_data_flow_group
> default_flow
;
525 default_flow
.emplace();
526 default_flow
->init_default(parent
->all_zones
);
529 for (auto& item
: sync_policy
.groups
) {
530 auto& group
= item
.second
;
531 auto& flow_group_map
= flow_groups
[group
.id
];
533 flow_group_map
.init(cct
, zone_id
, bucket
, group
,
534 (default_flow
? &(*default_flow
) : nullptr),
536 [&](const rgw_zone_id
& source_zone
,
537 std::optional
<rgw_bucket
> source_bucket
,
538 const rgw_zone_id
& dest_zone
,
539 std::optional
<rgw_bucket
> dest_bucket
) {
543 return parent
->allowed_data_flow(source_zone
,
547 false); /* just check that it's not disabled */
552 void RGWBucketSyncFlowManager::reflect(std::optional
<rgw_bucket
> effective_bucket
,
553 RGWBucketSyncFlowManager::pipe_set
*source_pipes
,
554 RGWBucketSyncFlowManager::pipe_set
*dest_pipes
,
555 bool only_enabled
) const
558 string effective_bucket_key
;
559 if (effective_bucket
) {
560 effective_bucket_key
= effective_bucket
->get_key();
563 parent
->reflect(effective_bucket
, source_pipes
, dest_pipes
, only_enabled
);
566 for (auto& item
: flow_groups
) {
567 auto& flow_group_map
= item
.second
;
569 /* only return enabled groups */
570 if (flow_group_map
.status
!= rgw_sync_policy_group::Status::ENABLED
&&
571 (only_enabled
|| flow_group_map
.status
!= rgw_sync_policy_group::Status::ALLOWED
)) {
575 for (auto& entry
: flow_group_map
.sources
) {
576 rgw_sync_bucket_pipe pipe
= entry
.second
;
577 if (!pipe
.dest
.match_bucket(effective_bucket
)) {
581 pipe
.source
.apply_bucket(effective_bucket
);
582 pipe
.dest
.apply_bucket(effective_bucket
);
584 ldout(cct
, 20) << __func__
<< "(): flow manager (bucket=" << effective_bucket_key
<< "): adding source pipe: " << pipe
<< dendl
;
585 source_pipes
->insert(pipe
);
588 for (auto& entry
: flow_group_map
.dests
) {
589 rgw_sync_bucket_pipe pipe
= entry
.second
;
591 if (!pipe
.source
.match_bucket(effective_bucket
)) {
595 pipe
.source
.apply_bucket(effective_bucket
);
596 pipe
.dest
.apply_bucket(effective_bucket
);
598 ldout(cct
, 20) << __func__
<< "(): flow manager (bucket=" << effective_bucket_key
<< "): adding dest pipe: " << pipe
<< dendl
;
599 dest_pipes
->insert(pipe
);
605 RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext
*_cct
,
606 const rgw_zone_id
& _zone_id
,
607 std::optional
<rgw_bucket
> _bucket
,
608 const RGWBucketSyncFlowManager
*_parent
) : cct(_cct
),
614 void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone
*zone_svc
,
615 RGWSI_SyncModules
*sync_modules_svc
,
616 rgw_sync_policy_info
*ppolicy
)
620 rgw_sync_policy_info policy
;
622 auto& group
= policy
.groups
["default"];
623 auto& zonegroup
= zone_svc
->get_zonegroup();
625 for (const auto& ziter1
: zonegroup
.zones
) {
626 auto& id1
= ziter1
.first
;
627 const RGWZone
& z1
= ziter1
.second
;
629 for (const auto& ziter2
: zonegroup
.zones
) {
630 auto& id2
= ziter2
.first
;
631 const RGWZone
& z2
= ziter2
.second
;
637 if (z1
.syncs_from(z2
.name
)) {
639 rgw_sync_directional_rule
*rule
;
640 group
.data_flow
.find_or_create_directional(id2
,
647 if (!found
) { /* nothing syncs */
651 rgw_sync_bucket_pipes pipes
;
653 pipes
.source
.all_zones
= true;
654 pipes
.dest
.all_zones
= true;
656 group
.pipes
.emplace_back(std::move(pipes
));
659 group
.status
= rgw_sync_policy_group::Status::ENABLED
;
661 *ppolicy
= std::move(policy
);
664 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone
*_zone_svc
,
665 RGWSI_SyncModules
*sync_modules_svc
,
666 RGWSI_Bucket_Sync
*_bucket_sync_svc
,
667 std::optional
<rgw_zone_id
> effective_zone
) : zone_svc(_zone_svc
) ,
668 bucket_sync_svc(_bucket_sync_svc
) {
669 zone_id
= effective_zone
.value_or(zone_svc
->zone_id());
670 flow_mgr
.reset(new RGWBucketSyncFlowManager(zone_svc
->ctx(),
674 sync_policy
= zone_svc
->get_zonegroup().sync_policy
;
676 if (sync_policy
.empty()) {
677 RGWSyncPolicyCompat::convert_old_sync_config(zone_svc
, sync_modules_svc
, &sync_policy
);
678 legacy_config
= true;
682 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler
*_parent
,
683 const RGWBucketInfo
& _bucket_info
,
684 map
<string
, bufferlist
>&& _bucket_attrs
) : parent(_parent
),
685 bucket_info(_bucket_info
),
686 bucket_attrs(std::move(_bucket_attrs
)) {
687 if (_bucket_info
.sync_policy
) {
688 sync_policy
= *_bucket_info
.sync_policy
;
690 for (auto& entry
: sync_policy
.groups
) {
691 for (auto& pipe
: entry
.second
.pipes
) {
692 if (pipe
.params
.mode
== rgw_sync_pipe_params::MODE_USER
&&
693 pipe
.params
.user
.empty()) {
694 pipe
.params
.user
= _bucket_info
.owner
;
699 legacy_config
= parent
->legacy_config
;
700 bucket
= _bucket_info
.bucket
;
701 zone_svc
= parent
->zone_svc
;
702 bucket_sync_svc
= parent
->bucket_sync_svc
;
703 flow_mgr
.reset(new RGWBucketSyncFlowManager(zone_svc
->ctx(),
706 parent
->flow_mgr
.get()));
709 RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler
*_parent
,
710 const rgw_bucket
& _bucket
,
711 std::optional
<rgw_sync_policy_info
> _sync_policy
) : parent(_parent
) {
713 sync_policy
= *_sync_policy
;
715 legacy_config
= parent
->legacy_config
;
717 zone_svc
= parent
->zone_svc
;
718 bucket_sync_svc
= parent
->bucket_sync_svc
;
719 flow_mgr
.reset(new RGWBucketSyncFlowManager(zone_svc
->ctx(),
722 parent
->flow_mgr
.get()));
725 RGWBucketSyncPolicyHandler
*RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo
& bucket_info
,
726 map
<string
, bufferlist
>&& bucket_attrs
) const
728 return new RGWBucketSyncPolicyHandler(this, bucket_info
, std::move(bucket_attrs
));
731 RGWBucketSyncPolicyHandler
*RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket
& bucket
,
732 std::optional
<rgw_sync_policy_info
> sync_policy
) const
734 return new RGWBucketSyncPolicyHandler(this, bucket
, sync_policy
);
737 int RGWBucketSyncPolicyHandler::init(const DoutPrefixProvider
*dpp
, optional_yield y
)
739 int r
= bucket_sync_svc
->get_bucket_sync_hints(dpp
, bucket
.value_or(rgw_bucket()),
744 ldpp_dout(dpp
, 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket="
745 << bucket
<< " returned r=" << r
<< dendl
;
749 flow_mgr
->init(sync_policy
);
751 reflect(&source_pipes
,
762 void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set
*psource_pipes
,
763 RGWBucketSyncFlowManager::pipe_set
*ptarget_pipes
,
764 map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> *psources
,
765 map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> *ptargets
,
766 std::set
<rgw_zone_id
> *psource_zones
,
767 std::set
<rgw_zone_id
> *ptarget_zones
,
768 bool only_enabled
) const
770 RGWBucketSyncFlowManager::pipe_set _source_pipes
;
771 RGWBucketSyncFlowManager::pipe_set _target_pipes
;
772 map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> _sources
;
773 map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> _targets
;
774 std::set
<rgw_zone_id
> _source_zones
;
775 std::set
<rgw_zone_id
> _target_zones
;
777 flow_mgr
->reflect(bucket
, &_source_pipes
, &_target_pipes
, only_enabled
);
779 for (auto& entry
: _source_pipes
.pipe_map
) {
780 auto& pipe
= entry
.second
;
781 if (!pipe
.source
.zone
) {
784 _source_zones
.insert(*pipe
.source
.zone
);
785 _sources
[*pipe
.source
.zone
].insert(pipe
);
788 for (auto& entry
: _target_pipes
.pipe_map
) {
789 auto& pipe
= entry
.second
;
790 if (!pipe
.dest
.zone
) {
793 _target_zones
.insert(*pipe
.dest
.zone
);
794 _targets
[*pipe
.dest
.zone
].insert(pipe
);
798 *psource_pipes
= std::move(_source_pipes
);
801 *ptarget_pipes
= std::move(_target_pipes
);
804 *psources
= std::move(_sources
);
807 *ptargets
= std::move(_targets
);
810 *psource_zones
= std::move(_source_zones
);
813 *ptarget_zones
= std::move(_target_zones
);
817 multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> RGWBucketSyncPolicyHandler::get_all_sources() const
819 multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> m
;
821 for (auto& source_entry
: sources
) {
822 auto& zone_id
= source_entry
.first
;
824 auto& pipes
= source_entry
.second
.pipe_map
;
826 for (auto& entry
: pipes
) {
827 auto& pipe
= entry
.second
;
828 m
.insert(make_pair(zone_id
, pipe
));
832 for (auto& pipe
: resolved_sources
) {
833 if (!pipe
.source
.zone
) {
837 m
.insert(make_pair(*pipe
.source
.zone
, pipe
));
843 multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> RGWBucketSyncPolicyHandler::get_all_dests() const
845 multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> m
;
847 for (auto& dest_entry
: targets
) {
848 auto& zone_id
= dest_entry
.first
;
850 auto& pipes
= dest_entry
.second
.pipe_map
;
852 for (auto& entry
: pipes
) {
853 auto& pipe
= entry
.second
;
854 m
.insert(make_pair(zone_id
, pipe
));
858 for (auto& pipe
: resolved_dests
) {
859 if (!pipe
.dest
.zone
) {
863 m
.insert(make_pair(*pipe
.dest
.zone
, pipe
));
869 multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id
& zone_id
) const
871 multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> m
;
873 auto iter
= targets
.find(zone_id
);
874 if (iter
!= targets
.end()) {
875 auto& pipes
= iter
->second
.pipe_map
;
877 for (auto& entry
: pipes
) {
878 auto& pipe
= entry
.second
;
879 m
.insert(make_pair(zone_id
, pipe
));
883 for (auto& pipe
: resolved_dests
) {
884 if (!pipe
.dest
.zone
||
885 *pipe
.dest
.zone
!= zone_id
) {
889 m
.insert(make_pair(*pipe
.dest
.zone
, pipe
));
895 void RGWBucketSyncPolicyHandler::get_pipes(std::set
<rgw_sync_bucket_pipe
> *_sources
, std::set
<rgw_sync_bucket_pipe
> *_targets
,
896 std::optional
<rgw_sync_bucket_entity
> filter_peer
) { /* return raw pipes */
897 for (auto& entry
: source_pipes
.pipe_map
) {
898 auto& source_pipe
= entry
.second
;
900 source_pipe
.source
.match(*filter_peer
)) {
901 _sources
->insert(source_pipe
);
905 for (auto& entry
: target_pipes
.pipe_map
) {
906 auto& target_pipe
= entry
.second
;
908 target_pipe
.dest
.match(*filter_peer
)) {
909 _targets
->insert(target_pipe
);
914 bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
920 if (bucket_is_sync_source()) {
924 return (zone_svc
->need_to_log_data() &&
925 bucket_info
->datasync_flag_enabled());
928 bool RGWBucketSyncPolicyHandler::bucket_imports_data() const
930 return bucket_is_sync_target();