1 #include "svc_bucket_sync_sobj.h"
3 #include "svc_sys_obj_cache.h"
4 #include "svc_bucket_sobj.h"
6 #include "rgw/rgw_bucket_sync.h"
7 #include "rgw/rgw_zone.h"
8 #include "rgw/rgw_sync_policy.h"
10 #define dout_subsys ceph_subsys_rgw
12 static string bucket_sync_sources_oid_prefix
= "bucket.sync-source-hints";
13 static string bucket_sync_targets_oid_prefix
= "bucket.sync-target-hints";
15 class RGWSI_Bucket_Sync_SObj_HintIndexManager
{
24 RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone
*_zone_svc
,
25 RGWSI_SysObj
*_sysobj_svc
) {
27 svc
.sysobj
= _sysobj_svc
;
29 cct
= svc
.zone
->ctx();
32 rgw_raw_obj
get_sources_obj(const rgw_bucket
& bucket
) const;
33 rgw_raw_obj
get_dests_obj(const rgw_bucket
& bucket
) const;
35 template <typename C1
, typename C2
>
36 int update_hints(const RGWBucketInfo
& bucket_info
,
44 RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext
*cct
) : RGWSI_Bucket_Sync(cct
) {
46 RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
49 void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone
*_zone_svc
,
50 RGWSI_SysObj
*_sysobj_svc
,
51 RGWSI_SysObj_Cache
*_cache_svc
,
52 RGWSI_Bucket_SObj
*bucket_sobj_svc
)
55 svc
.sysobj
= _sysobj_svc
;
56 svc
.cache
= _cache_svc
;
57 svc
.bucket_sobj
= bucket_sobj_svc
;
59 hint_index_mgr
.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc
.zone
, svc
.sysobj
));
62 int RGWSI_Bucket_Sync_SObj::do_start(optional_yield
)
64 sync_policy_cache
.reset(new RGWChainedCacheImpl
<bucket_sync_policy_cache_entry
>);
65 sync_policy_cache
->init(svc
.cache
);
70 void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx
& ctx
,
71 const std::set
<rgw_zone_id
>& zones
,
72 const std::set
<rgw_bucket
>& buckets
,
73 std::set
<rgw_sync_bucket_entity
> *hint_entities
,
76 vector
<rgw_bucket
> hint_buckets
;
78 hint_buckets
.reserve(buckets
.size());
80 for (auto& b
: buckets
) {
81 RGWBucketInfo hint_bucket_info
;
82 int ret
= svc
.bucket_sobj
->read_bucket_info(ctx
, b
, &hint_bucket_info
,
83 nullptr, nullptr, boost::none
,
86 ldout(cct
, 20) << "could not init bucket info for hint bucket=" << b
<< " ... skipping" << dendl
;
90 hint_buckets
.emplace_back(std::move(hint_bucket_info
.bucket
));
93 for (auto& zone
: zones
) {
94 for (auto& b
: hint_buckets
) {
95 hint_entities
->insert(rgw_sync_bucket_entity(zone
, b
));
100 int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx
& ctx
,
101 rgw_sync_bucket_entity
& self_entity
,
102 RGWBucketSyncPolicyHandlerRef
& handler
,
103 RGWBucketSyncPolicyHandlerRef
& zone_policy_handler
,
104 std::map
<optional_zone_bucket
, RGWBucketSyncPolicyHandlerRef
>& temp_map
,
107 set
<rgw_zone_id
> source_zones
;
108 set
<rgw_zone_id
> target_zones
;
110 zone_policy_handler
->reflect(nullptr, nullptr,
114 false); /* relaxed: also get all zones that we allow to sync to/from */
116 std::set
<rgw_sync_bucket_entity
> hint_entities
;
118 get_hint_entities(ctx
, source_zones
, handler
->get_source_hints(), &hint_entities
, y
);
119 get_hint_entities(ctx
, target_zones
, handler
->get_target_hints(), &hint_entities
, y
);
121 std::set
<rgw_sync_bucket_pipe
> resolved_sources
;
122 std::set
<rgw_sync_bucket_pipe
> resolved_dests
;
124 for (auto& hint_entity
: hint_entities
) {
125 if (!hint_entity
.zone
||
126 !hint_entity
.bucket
) {
127 continue; /* shouldn't really happen */
130 auto& zid
= *hint_entity
.zone
;
131 auto& hint_bucket
= *hint_entity
.bucket
;
133 RGWBucketSyncPolicyHandlerRef hint_bucket_handler
;
135 auto iter
= temp_map
.find(optional_zone_bucket(zid
, hint_bucket
));
136 if (iter
!= temp_map
.end()) {
137 hint_bucket_handler
= iter
->second
;
139 int r
= do_get_policy_handler(ctx
, zid
, hint_bucket
, temp_map
, &hint_bucket_handler
, y
);
141 ldout(cct
, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket
<< " ... skipping" << dendl
;
146 hint_bucket_handler
->get_pipes(&resolved_dests
,
148 self_entity
); /* flipping resolved dests and sources as these are
149 relative to the remote entity */
152 handler
->set_resolved_hints(std::move(resolved_sources
), std::move(resolved_dests
));
157 int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx
& ctx
,
158 std::optional
<rgw_zone_id
> zone
,
159 std::optional
<rgw_bucket
> _bucket
,
160 std::map
<optional_zone_bucket
, RGWBucketSyncPolicyHandlerRef
>& temp_map
,
161 RGWBucketSyncPolicyHandlerRef
*handler
,
165 *handler
= svc
.zone
->get_sync_policy_handler(zone
);
169 auto& bucket
= *_bucket
;
174 if (zone
&& *zone
!= svc
.zone
->zone_id()) {
178 bucket_key
= RGWSI_Bucket::get_bi_meta_key(bucket
);
180 string
cache_key("bi/" + zone_key
+ "/" + bucket_key
);
182 if (auto e
= sync_policy_cache
->find(cache_key
)) {
183 *handler
= e
->handler
;
187 bucket_sync_policy_cache_entry e
;
188 rgw_cache_entry_info cache_info
;
190 RGWBucketInfo bucket_info
;
191 map
<string
, bufferlist
> attrs
;
193 int r
= svc
.bucket_sobj
->read_bucket_instance_info(ctx
.bi
,
202 ldout(cct
, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key
<< ") returned r=" << r
<< dendl
;
207 auto zone_policy_handler
= svc
.zone
->get_sync_policy_handler(zone
);
208 if (!zone_policy_handler
) {
209 ldout(cct
, 20) << "ERROR: could not find policy handler for zone=" << zone
<< dendl
;
213 e
.handler
.reset(zone_policy_handler
->alloc_child(bucket_info
, std::move(attrs
)));
215 r
= e
.handler
->init(y
);
217 ldout(cct
, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r
<< dendl
;
221 temp_map
.emplace(optional_zone_bucket
{zone
, bucket
}, e
.handler
);
223 rgw_sync_bucket_entity
self_entity(zone
.value_or(svc
.zone
->zone_id()), bucket
);
225 r
= resolve_policy_hints(ctx
, self_entity
,
230 ldout(cct
, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key
<< ", r=" << r
<< dendl
;
234 if (!sync_policy_cache
->put(svc
.cache
, cache_key
, &e
, {&cache_info
})) {
235 ldout(cct
, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl
;
238 *handler
= e
.handler
;
243 int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx
& ctx
,
244 std::optional
<rgw_zone_id
> zone
,
245 std::optional
<rgw_bucket
> _bucket
,
246 RGWBucketSyncPolicyHandlerRef
*handler
,
249 std::map
<optional_zone_bucket
, RGWBucketSyncPolicyHandlerRef
> temp_map
;
250 return do_get_policy_handler(ctx
, zone
, _bucket
, temp_map
, handler
, y
);
253 static bool diff_sets(std::set
<rgw_bucket
>& orig_set
,
254 std::set
<rgw_bucket
>& new_set
,
255 vector
<rgw_bucket
> *added
,
256 vector
<rgw_bucket
> *removed
)
258 auto oiter
= orig_set
.begin();
259 auto niter
= new_set
.begin();
261 while (oiter
!= orig_set
.end() &&
262 niter
!= new_set
.end()) {
263 if (*oiter
== *niter
) {
268 while (*oiter
< *niter
&&
269 oiter
!= orig_set
.end()) {
270 removed
->push_back(*oiter
);
273 while (*niter
< *oiter
274 && niter
!= new_set
.end()) {
275 added
->push_back(*niter
);
279 for (; oiter
!= orig_set
.end(); ++oiter
) {
280 removed
->push_back(*oiter
);
282 for (; niter
!= new_set
.end(); ++niter
) {
283 added
->push_back(*niter
);
286 return !(removed
->empty() && added
->empty());
290 class RGWSI_BS_SObj_HintIndexObj
292 friend class RGWSI_Bucket_Sync_SObj
;
296 RGWSI_SysObj
*sysobj
;
299 RGWSysObjectCtx obj_ctx
;
303 RGWObjVersionTracker ot
;
305 bool has_data
{false};
310 map
<rgw_bucket
/* info_source */, obj_version
> sources
;
312 void encode(bufferlist
& bl
) const {
313 ENCODE_START(1, 1, bl
);
319 void decode(bufferlist::const_iterator
& bl
) {
326 bool add(const rgw_bucket
& info_source
,
327 const obj_version
& info_source_ver
) {
328 auto& ver
= sources
[info_source
];
330 if (ver
== info_source_ver
) { /* already updated */
334 if (info_source_ver
.tag
== ver
.tag
&&
335 info_source_ver
.ver
< ver
.ver
) {
339 ver
= info_source_ver
;
344 bool remove(const rgw_bucket
& info_source
,
345 const obj_version
& info_source_ver
) {
346 auto iter
= sources
.find(info_source
);
347 if (iter
== sources
.end()) {
351 auto& ver
= iter
->second
;
353 if (info_source_ver
.tag
== ver
.tag
&&
354 info_source_ver
.ver
< ver
.ver
) {
358 sources
.erase(info_source
);
363 return sources
.empty();
367 struct single_instance_info
{
368 map
<rgw_bucket
, bi_entry
> entries
;
370 void encode(bufferlist
& bl
) const {
371 ENCODE_START(1, 1, bl
);
376 void decode(bufferlist::const_iterator
& bl
) {
382 bool add_entry(const rgw_bucket
& info_source
,
383 const obj_version
& info_source_ver
,
384 const rgw_bucket
& bucket
) {
385 auto& entry
= entries
[bucket
];
387 if (!entry
.add(info_source
, info_source_ver
)) {
391 entry
.bucket
= bucket
;
396 bool remove_entry(const rgw_bucket
& info_source
,
397 const obj_version
& info_source_ver
,
398 const rgw_bucket
& bucket
) {
399 auto iter
= entries
.find(bucket
);
400 if (iter
== entries
.end()) {
404 if (!iter
->second
.remove(info_source
, info_source_ver
)) {
408 if (iter
->second
.empty()) {
420 return entries
.empty();
423 void get_entities(std::set
<rgw_bucket
> *result
) const {
424 for (auto& iter
: entries
) {
425 result
->insert(iter
.first
);
431 map
<rgw_bucket
, single_instance_info
> instances
;
433 void encode(bufferlist
& bl
) const {
434 ENCODE_START(1, 1, bl
);
435 encode(instances
, bl
);
439 void decode(bufferlist::const_iterator
& bl
) {
441 decode(instances
, bl
);
446 return instances
.empty();
453 void get_entities(const rgw_bucket
& bucket
,
454 std::set
<rgw_bucket
> *result
) const {
455 auto iter
= instances
.find(bucket
);
456 if (iter
== instances
.end()) {
459 iter
->second
.get_entities(result
);
463 RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj
*_sysobj_svc
,
464 const rgw_raw_obj
& _obj
) : cct(_sysobj_svc
->ctx()),
465 obj_ctx(_sysobj_svc
->init_obj_ctx()),
467 sysobj(obj_ctx
.get_obj(obj
))
469 svc
.sysobj
= _sysobj_svc
;
472 template <typename C1
, typename C2
>
473 int update(const rgw_bucket
& entity
,
474 const RGWBucketInfo
& info_source
,
480 template <typename C1
, typename C2
>
481 void update_entries(const rgw_bucket
& info_source
,
482 const obj_version
& info_source_ver
,
485 single_instance_info
*instance
);
487 int read(optional_yield y
);
488 int flush(optional_yield y
);
495 void get_entities(const rgw_bucket
& bucket
,
496 std::set
<rgw_bucket
> *result
) const {
497 info
.get_entities(bucket
, result
);
500 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry
)
501 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info
)
502 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map
)
504 template <typename C1
, typename C2
>
505 int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket
& entity
,
506 const RGWBucketInfo
& info_source
,
513 auto& info_source_ver
= info_source
.objv_tracker
.read_version
;
515 #define MAX_RETRIES 25
517 for (int i
= 0; i
< MAX_RETRIES
; ++i
) {
521 ldout(cct
, 0) << "ERROR: cannot update hint index: failed to read: r=" << r
<< dendl
;
526 auto& instance
= info
.instances
[entity
];
528 update_entries(info_source
.bucket
,
533 if (instance
.empty()) {
534 info
.instances
.erase(entity
);
542 if (r
!= -ECANCELED
) {
543 ldout(cct
, 0) << "ERROR: failed to flush hint index: obj=" << obj
<< " r=" << r
<< dendl
;
549 ldout(cct
, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj
<< "), likely a bug" << dendl
;
554 template <typename C1
, typename C2
>
555 void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket
& info_source
,
556 const obj_version
& info_source_ver
,
559 single_instance_info
*instance
)
562 for (auto& bucket
: *remove
) {
563 instance
->remove_entry(info_source
, info_source_ver
, bucket
);
568 for (auto& bucket
: *add
) {
569 instance
->add_entry(info_source
, info_source_ver
, bucket
);
574 int RGWSI_BS_SObj_HintIndexObj::read(optional_yield y
) {
575 RGWObjVersionTracker _ot
;
578 .set_objv_tracker(&_ot
) /* forcing read of current version */
580 if (r
< 0 && r
!= -ENOENT
) {
581 ldout(cct
, 0) << "ERROR: failed reading data (obj=" << obj
<< "), r=" << r
<< dendl
;
588 auto iter
= bl
.cbegin();
592 } catch (buffer::error
& err
) {
593 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): failed to decode entries, ignoring" << dendl
;
603 int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y
) {
611 .set_objv_tracker(&ot
) /* forcing read of current version */
614 } else { /* remove */
616 .set_objv_tracker(&ot
)
627 rgw_raw_obj
RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket
& bucket
) const
629 rgw_bucket b
= bucket
;
631 return rgw_raw_obj(svc
.zone
->get_zone_params().log_pool
,
632 bucket_sync_sources_oid_prefix
+ "." + b
.get_key());
635 rgw_raw_obj
RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket
& bucket
) const
637 rgw_bucket b
= bucket
;
639 return rgw_raw_obj(svc
.zone
->get_zone_params().log_pool
,
640 bucket_sync_targets_oid_prefix
+ "." + b
.get_key());
643 template <typename C1
, typename C2
>
644 int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const RGWBucketInfo
& bucket_info
,
651 C1 self_entity
= { bucket_info
.bucket
};
653 if (!added_dests
.empty() ||
654 !removed_dests
.empty()) {
655 /* update our dests */
656 RGWSI_BS_SObj_HintIndexObj
index(svc
.sysobj
,
657 get_dests_obj(bucket_info
.bucket
));
658 int r
= index
.update(bucket_info
.bucket
,
664 ldout(cct
, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info
.bucket
<< " r=" << r
<< dendl
;
668 /* update dest buckets */
669 for (auto& dest_bucket
: added_dests
) {
670 RGWSI_BS_SObj_HintIndexObj
dep_index(svc
.sysobj
,
671 get_sources_obj(dest_bucket
));
672 int r
= dep_index
.update(dest_bucket
,
675 static_cast<C2
*>(nullptr),
678 ldout(cct
, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket
<< " r=" << r
<< dendl
;
682 /* update removed dest buckets */
683 for (auto& dest_bucket
: removed_dests
) {
684 RGWSI_BS_SObj_HintIndexObj
dep_index(svc
.sysobj
,
685 get_sources_obj(dest_bucket
));
686 int r
= dep_index
.update(dest_bucket
,
688 static_cast<C1
*>(nullptr),
692 ldout(cct
, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket
<< " r=" << r
<< dendl
;
698 if (!added_sources
.empty() ||
699 !removed_sources
.empty()) {
700 RGWSI_BS_SObj_HintIndexObj
index(svc
.sysobj
,
701 get_sources_obj(bucket_info
.bucket
));
702 /* update our sources */
703 int r
= index
.update(bucket_info
.bucket
,
709 ldout(cct
, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info
.bucket
<< " r=" << r
<< dendl
;
713 /* update added sources buckets */
714 for (auto& source_bucket
: added_sources
) {
715 RGWSI_BS_SObj_HintIndexObj
dep_index(svc
.sysobj
,
716 get_dests_obj(source_bucket
));
717 int r
= dep_index
.update(source_bucket
,
720 static_cast<C2
*>(nullptr),
723 ldout(cct
, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket
<< " r=" << r
<< dendl
;
727 /* update removed dest buckets */
728 for (auto& source_bucket
: removed_sources
) {
729 RGWSI_BS_SObj_HintIndexObj
dep_index(svc
.sysobj
,
730 get_dests_obj(source_bucket
));
731 int r
= dep_index
.update(source_bucket
,
733 static_cast<C1
*>(nullptr),
737 ldout(cct
, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket
<< " r=" << r
<< dendl
;
746 int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo
& bucket_info
,
749 std::set
<rgw_bucket
> sources_set
;
750 std::set
<rgw_bucket
> dests_set
;
752 if (bucket_info
.sync_policy
) {
753 bucket_info
.sync_policy
->get_potential_related_buckets(bucket_info
.bucket
,
758 std::vector
<rgw_bucket
> removed_sources
;
759 removed_sources
.reserve(sources_set
.size());
760 for (auto& e
: sources_set
) {
761 removed_sources
.push_back(e
);
764 std::vector
<rgw_bucket
> removed_dests
;
765 removed_dests
.reserve(dests_set
.size());
766 for (auto& e
: dests_set
) {
767 removed_dests
.push_back(e
);
770 std::vector
<rgw_bucket
> added_sources
;
771 std::vector
<rgw_bucket
> added_dests
;
773 return hint_index_mgr
->update_hints(bucket_info
,
781 int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo
& bucket_info
,
782 RGWBucketInfo
*orig_bucket_info
,
785 std::set
<rgw_bucket
> orig_sources
;
786 std::set
<rgw_bucket
> orig_dests
;
788 if (orig_bucket_info
&&
789 orig_bucket_info
->sync_policy
) {
790 orig_bucket_info
->sync_policy
->get_potential_related_buckets(bucket_info
.bucket
,
795 std::set
<rgw_bucket
> sources
;
796 std::set
<rgw_bucket
> dests
;
797 if (bucket_info
.sync_policy
) {
798 bucket_info
.sync_policy
->get_potential_related_buckets(bucket_info
.bucket
,
803 std::vector
<rgw_bucket
> removed_sources
;
804 std::vector
<rgw_bucket
> added_sources
;
805 bool found
= diff_sets(orig_sources
, sources
, &added_sources
, &removed_sources
);
806 ldout(cct
, 20) << __func__
<< "(): bucket=" << bucket_info
.bucket
<< ": orig_sources=" << orig_sources
<< " new_sources=" << sources
<< dendl
;
807 ldout(cct
, 20) << __func__
<< "(): bucket=" << bucket_info
.bucket
<< ": potential sources added=" << added_sources
<< " removed=" << removed_sources
<< dendl
;
809 std::vector
<rgw_bucket
> removed_dests
;
810 std::vector
<rgw_bucket
> added_dests
;
811 found
= found
|| diff_sets(orig_dests
, dests
, &added_dests
, &removed_dests
);
813 ldout(cct
, 20) << __func__
<< "(): bucket=" << bucket_info
.bucket
<< ": orig_dests=" << orig_dests
<< " new_dests=" << dests
<< dendl
;
814 ldout(cct
, 20) << __func__
<< "(): bucket=" << bucket_info
.bucket
<< ": potential dests added=" << added_dests
<< " removed=" << removed_dests
<< dendl
;
820 return hint_index_mgr
->update_hints(bucket_info
,
821 dests
, /* set all dests, not just the ones that were added */
823 sources
, /* set all sources, not just that the ones that were added */
828 int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket
& bucket
,
829 std::set
<rgw_bucket
> *sources
,
830 std::set
<rgw_bucket
> *dests
,
833 if (!sources
&& !dests
) {
838 RGWSI_BS_SObj_HintIndexObj
index(svc
.sysobj
,
839 hint_index_mgr
->get_sources_obj(bucket
));
840 int r
= index
.read(y
);
842 ldout(cct
, 0) << "ERROR: failed to update sources index for bucket=" << bucket
<< " r=" << r
<< dendl
;
846 index
.get_entities(bucket
, sources
);
848 if (!bucket
.bucket_id
.empty()) {
849 rgw_bucket b
= bucket
;
851 index
.get_entities(b
, sources
);
856 RGWSI_BS_SObj_HintIndexObj
index(svc
.sysobj
,
857 hint_index_mgr
->get_dests_obj(bucket
));
858 int r
= index
.read(y
);
860 ldout(cct
, 0) << "ERROR: failed to read targets index for bucket=" << bucket
<< " r=" << r
<< dendl
;
864 index
.get_entities(bucket
, dests
);
866 if (!bucket
.bucket_id
.empty()) {
867 rgw_bucket b
= bucket
;
869 index
.get_entities(b
, dests
);