]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/services/svc_bucket_sync_sobj.cc
f80b9618bb9a5e320ca23f9e1e9e29e206f6b8a5
[ceph.git] / ceph / src / rgw / services / svc_bucket_sync_sobj.cc
1 #include "svc_bucket_sync_sobj.h"
2 #include "svc_zone.h"
3 #include "svc_sys_obj_cache.h"
4 #include "svc_bucket_sobj.h"
5
6 #include "rgw/rgw_bucket_sync.h"
7 #include "rgw/rgw_zone.h"
8 #include "rgw/rgw_sync_policy.h"
9
10 #define dout_subsys ceph_subsys_rgw
11
12 static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints";
13 static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints";
14
15 class RGWSI_Bucket_Sync_SObj_HintIndexManager {
16 CephContext *cct;
17
18 struct {
19 RGWSI_Zone *zone;
20 RGWSI_SysObj *sysobj;
21 } svc;
22
23 public:
24 RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone *_zone_svc,
25 RGWSI_SysObj *_sysobj_svc) {
26 svc.zone = _zone_svc;
27 svc.sysobj = _sysobj_svc;
28
29 cct = svc.zone->ctx();
30 }
31
32 rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const;
33 rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const;
34
35 template <typename C1, typename C2>
36 int update_hints(const RGWBucketInfo& bucket_info,
37 C1& added_dests,
38 C2& removed_dests,
39 C1& added_sources,
40 C2& removed_sources,
41 optional_yield y);
42 };
43
44 RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {
45 }
46 RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
47 }
48
49 void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
50 RGWSI_SysObj *_sysobj_svc,
51 RGWSI_SysObj_Cache *_cache_svc,
52 RGWSI_Bucket_SObj *bucket_sobj_svc)
53 {
54 svc.zone = _zone_svc;
55 svc.sysobj = _sysobj_svc;
56 svc.cache = _cache_svc;
57 svc.bucket_sobj = bucket_sobj_svc;
58
59 hint_index_mgr.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc.zone, svc.sysobj));
60 }
61
62 int RGWSI_Bucket_Sync_SObj::do_start(optional_yield)
63 {
64 sync_policy_cache.reset(new RGWChainedCacheImpl<bucket_sync_policy_cache_entry>);
65 sync_policy_cache->init(svc.cache);
66
67 return 0;
68 }
69
70 void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx,
71 const std::set<rgw_zone_id>& zones,
72 const std::set<rgw_bucket>& buckets,
73 std::set<rgw_sync_bucket_entity> *hint_entities,
74 optional_yield y)
75 {
76 vector<rgw_bucket> hint_buckets;
77
78 hint_buckets.reserve(buckets.size());
79
80 for (auto& b : buckets) {
81 RGWBucketInfo hint_bucket_info;
82 int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info,
83 nullptr, nullptr, boost::none,
84 y);
85 if (ret < 0) {
86 ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
87 continue;
88 }
89
90 hint_buckets.emplace_back(std::move(hint_bucket_info.bucket));
91 }
92
93 for (auto& zone : zones) {
94 for (auto& b : hint_buckets) {
95 hint_entities->insert(rgw_sync_bucket_entity(zone, b));
96 }
97 }
98 }
99
100 int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx,
101 rgw_sync_bucket_entity& self_entity,
102 RGWBucketSyncPolicyHandlerRef& handler,
103 RGWBucketSyncPolicyHandlerRef& zone_policy_handler,
104 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
105 optional_yield y)
106 {
107 set<rgw_zone_id> source_zones;
108 set<rgw_zone_id> target_zones;
109
110 zone_policy_handler->reflect(nullptr, nullptr,
111 nullptr, nullptr,
112 &source_zones,
113 &target_zones,
114 false); /* relaxed: also get all zones that we allow to sync to/from */
115
116 std::set<rgw_sync_bucket_entity> hint_entities;
117
118 get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y);
119 get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y);
120
121 std::set<rgw_sync_bucket_pipe> resolved_sources;
122 std::set<rgw_sync_bucket_pipe> resolved_dests;
123
124 for (auto& hint_entity : hint_entities) {
125 if (!hint_entity.zone ||
126 !hint_entity.bucket) {
127 continue; /* shouldn't really happen */
128 }
129
130 auto& zid = *hint_entity.zone;
131 auto& hint_bucket = *hint_entity.bucket;
132
133 RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
134
135 auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket));
136 if (iter != temp_map.end()) {
137 hint_bucket_handler = iter->second;
138 } else {
139 int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y);
140 if (r < 0) {
141 ldout(cct, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl;
142 continue;
143 }
144 }
145
146 hint_bucket_handler->get_pipes(&resolved_dests,
147 &resolved_sources,
148 self_entity); /* flipping resolved dests and sources as these are
149 relative to the remote entity */
150 }
151
152 handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests));
153
154 return 0;
155 }
156
157 int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
158 std::optional<rgw_zone_id> zone,
159 std::optional<rgw_bucket> _bucket,
160 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
161 RGWBucketSyncPolicyHandlerRef *handler,
162 optional_yield y)
163 {
164 if (!_bucket) {
165 *handler = svc.zone->get_sync_policy_handler(zone);
166 return 0;
167 }
168
169 auto& bucket = *_bucket;
170
171 string zone_key;
172 string bucket_key;
173
174 if (zone && *zone != svc.zone->zone_id()) {
175 zone_key = zone->id;
176 }
177
178 bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket);
179
180 string cache_key("bi/" + zone_key + "/" + bucket_key);
181
182 if (auto e = sync_policy_cache->find(cache_key)) {
183 *handler = e->handler;
184 return 0;
185 }
186
187 bucket_sync_policy_cache_entry e;
188 rgw_cache_entry_info cache_info;
189
190 RGWBucketInfo bucket_info;
191 map<string, bufferlist> attrs;
192
193 int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi,
194 bucket_key,
195 &bucket_info,
196 nullptr,
197 &attrs,
198 y,
199 &cache_info);
200 if (r < 0) {
201 if (r != -ENOENT) {
202 ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl;
203 }
204 return r;
205 }
206
207 auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone);
208 if (!zone_policy_handler) {
209 ldout(cct, 20) << "ERROR: could not find policy handler for zone=" << zone << dendl;
210 return -ENOENT;
211 }
212
213 e.handler.reset(zone_policy_handler->alloc_child(bucket_info, std::move(attrs)));
214
215 r = e.handler->init(y);
216 if (r < 0) {
217 ldout(cct, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl;
218 return r;
219 }
220
221 temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler);
222
223 rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket);
224
225 r = resolve_policy_hints(ctx, self_entity,
226 e.handler,
227 zone_policy_handler,
228 temp_map, y);
229 if (r < 0) {
230 ldout(cct, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl;
231 return r;
232 }
233
234 if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
235 ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
236 }
237
238 *handler = e.handler;
239
240 return 0;
241 }
242
243 int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
244 std::optional<rgw_zone_id> zone,
245 std::optional<rgw_bucket> _bucket,
246 RGWBucketSyncPolicyHandlerRef *handler,
247 optional_yield y)
248 {
249 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef> temp_map;
250 return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y);
251 }
252
253 static bool diff_sets(std::set<rgw_bucket>& orig_set,
254 std::set<rgw_bucket>& new_set,
255 vector<rgw_bucket> *added,
256 vector<rgw_bucket> *removed)
257 {
258 auto oiter = orig_set.begin();
259 auto niter = new_set.begin();
260
261 while (oiter != orig_set.end() &&
262 niter != new_set.end()) {
263 if (*oiter == *niter) {
264 ++oiter;
265 ++niter;
266 continue;
267 }
268 while (*oiter < *niter &&
269 oiter != orig_set.end()) {
270 removed->push_back(*oiter);
271 ++oiter;
272 }
273 while (*niter < *oiter
274 && niter != new_set.end()) {
275 added->push_back(*niter);
276 ++niter;
277 }
278 }
279 for (; oiter != orig_set.end(); ++oiter) {
280 removed->push_back(*oiter);
281 }
282 for (; niter != new_set.end(); ++niter) {
283 added->push_back(*niter);
284 }
285
286 return !(removed->empty() && added->empty());
287 }
288
289
290 class RGWSI_BS_SObj_HintIndexObj
291 {
292 friend class RGWSI_Bucket_Sync_SObj;
293
294 CephContext *cct;
295 struct {
296 RGWSI_SysObj *sysobj;
297 } svc;
298
299 RGWSysObjectCtx obj_ctx;
300 rgw_raw_obj obj;
301 RGWSysObj sysobj;
302
303 RGWObjVersionTracker ot;
304
305 bool has_data{false};
306
307 public:
308 struct bi_entry {
309 rgw_bucket bucket;
310 map<rgw_bucket /* info_source */, obj_version> sources;
311
312 void encode(bufferlist& bl) const {
313 ENCODE_START(1, 1, bl);
314 encode(bucket, bl);
315 encode(sources, bl);
316 ENCODE_FINISH(bl);
317 }
318
319 void decode(bufferlist::const_iterator& bl) {
320 DECODE_START(1, bl);
321 decode(bucket, bl);
322 decode(sources, bl);
323 DECODE_FINISH(bl);
324 }
325
326 bool add(const rgw_bucket& info_source,
327 const obj_version& info_source_ver) {
328 auto& ver = sources[info_source];
329
330 if (ver == info_source_ver) { /* already updated */
331 return false;
332 }
333
334 if (info_source_ver.tag == ver.tag &&
335 info_source_ver.ver < ver.ver) {
336 return false;
337 }
338
339 ver = info_source_ver;
340
341 return true;
342 }
343
344 bool remove(const rgw_bucket& info_source,
345 const obj_version& info_source_ver) {
346 auto iter = sources.find(info_source);
347 if (iter == sources.end()) {
348 return false;
349 }
350
351 auto& ver = iter->second;
352
353 if (info_source_ver.tag == ver.tag &&
354 info_source_ver.ver < ver.ver) {
355 return false;
356 }
357
358 sources.erase(info_source);
359 return true;
360 }
361
362 bool empty() const {
363 return sources.empty();
364 }
365 };
366
367 struct single_instance_info {
368 map<rgw_bucket, bi_entry> entries;
369
370 void encode(bufferlist& bl) const {
371 ENCODE_START(1, 1, bl);
372 encode(entries, bl);
373 ENCODE_FINISH(bl);
374 }
375
376 void decode(bufferlist::const_iterator& bl) {
377 DECODE_START(1, bl);
378 decode(entries, bl);
379 DECODE_FINISH(bl);
380 }
381
382 bool add_entry(const rgw_bucket& info_source,
383 const obj_version& info_source_ver,
384 const rgw_bucket& bucket) {
385 auto& entry = entries[bucket];
386
387 if (!entry.add(info_source, info_source_ver)) {
388 return false;
389 }
390
391 entry.bucket = bucket;
392
393 return true;
394 }
395
396 bool remove_entry(const rgw_bucket& info_source,
397 const obj_version& info_source_ver,
398 const rgw_bucket& bucket) {
399 auto iter = entries.find(bucket);
400 if (iter == entries.end()) {
401 return false;
402 }
403
404 if (!iter->second.remove(info_source, info_source_ver)) {
405 return false;
406 }
407
408 if (iter->second.empty()) {
409 entries.erase(iter);
410 }
411
412 return true;
413 }
414
415 void clear() {
416 entries.clear();
417 }
418
419 bool empty() const {
420 return entries.empty();
421 }
422
423 void get_entities(std::set<rgw_bucket> *result) const {
424 for (auto& iter : entries) {
425 result->insert(iter.first);
426 }
427 }
428 };
429
430 struct info_map {
431 map<rgw_bucket, single_instance_info> instances;
432
433 void encode(bufferlist& bl) const {
434 ENCODE_START(1, 1, bl);
435 encode(instances, bl);
436 ENCODE_FINISH(bl);
437 }
438
439 void decode(bufferlist::const_iterator& bl) {
440 DECODE_START(1, bl);
441 decode(instances, bl);
442 DECODE_FINISH(bl);
443 }
444
445 bool empty() const {
446 return instances.empty();
447 }
448
449 void clear() {
450 instances.clear();
451 }
452
453 void get_entities(const rgw_bucket& bucket,
454 std::set<rgw_bucket> *result) const {
455 auto iter = instances.find(bucket);
456 if (iter == instances.end()) {
457 return;
458 }
459 iter->second.get_entities(result);
460 }
461 } info;
462
463 RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc,
464 const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()),
465 obj_ctx(_sysobj_svc->init_obj_ctx()),
466 obj(_obj),
467 sysobj(obj_ctx.get_obj(obj))
468 {
469 svc.sysobj = _sysobj_svc;
470 }
471
472 template <typename C1, typename C2>
473 int update(const rgw_bucket& entity,
474 const RGWBucketInfo& info_source,
475 C1 *add,
476 C2 *remove,
477 optional_yield y);
478
479 private:
480 template <typename C1, typename C2>
481 void update_entries(const rgw_bucket& info_source,
482 const obj_version& info_source_ver,
483 C1 *add,
484 C2 *remove,
485 single_instance_info *instance);
486
487 int read(optional_yield y);
488 int flush(optional_yield y);
489
490 void invalidate() {
491 has_data = false;
492 info.clear();
493 }
494
495 void get_entities(const rgw_bucket& bucket,
496 std::set<rgw_bucket> *result) const {
497 info.get_entities(bucket, result);
498 }
499 };
500 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry)
501 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info)
502 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map)
503
504 template <typename C1, typename C2>
505 int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity,
506 const RGWBucketInfo& info_source,
507 C1 *add,
508 C2 *remove,
509 optional_yield y)
510 {
511 int r = 0;
512
513 auto& info_source_ver = info_source.objv_tracker.read_version;
514
515 #define MAX_RETRIES 25
516
517 for (int i = 0; i < MAX_RETRIES; ++i) {
518 if (!has_data) {
519 r = read(y);
520 if (r < 0) {
521 ldout(cct, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl;
522 return r;
523 }
524 }
525
526 auto& instance = info.instances[entity];
527
528 update_entries(info_source.bucket,
529 info_source_ver,
530 add, remove,
531 &instance);
532
533 if (instance.empty()) {
534 info.instances.erase(entity);
535 }
536
537 r = flush(y);
538 if (r >= 0) {
539 return 0;
540 }
541
542 if (r != -ECANCELED) {
543 ldout(cct, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl;
544 return r;
545 }
546
547 invalidate();
548 }
549 ldout(cct, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl;
550
551 return -EIO;
552 }
553
554 template <typename C1, typename C2>
555 void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source,
556 const obj_version& info_source_ver,
557 C1 *add,
558 C2 *remove,
559 single_instance_info *instance)
560 {
561 if (remove) {
562 for (auto& bucket : *remove) {
563 instance->remove_entry(info_source, info_source_ver, bucket);
564 }
565 }
566
567 if (add) {
568 for (auto& bucket : *add) {
569 instance->add_entry(info_source, info_source_ver, bucket);
570 }
571 }
572 }
573
574 int RGWSI_BS_SObj_HintIndexObj::read(optional_yield y) {
575 RGWObjVersionTracker _ot;
576 bufferlist bl;
577 int r = sysobj.rop()
578 .set_objv_tracker(&_ot) /* forcing read of current version */
579 .read(&bl, y);
580 if (r < 0 && r != -ENOENT) {
581 ldout(cct, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl;
582 return r;
583 }
584
585 ot = _ot;
586
587 if (r >= 0) {
588 auto iter = bl.cbegin();
589 try {
590 decode(info, iter);
591 has_data = true;
592 } catch (buffer::error& err) {
593 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl;
594 info.clear();
595 }
596 } else {
597 info.clear();
598 }
599
600 return 0;
601 }
602
603 int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) {
604 int r;
605
606 if (!info.empty()) {
607 bufferlist bl;
608 encode(info, bl);
609
610 r = sysobj.wop()
611 .set_objv_tracker(&ot) /* forcing read of current version */
612 .write(bl, y);
613
614 } else { /* remove */
615 r = sysobj.wop()
616 .set_objv_tracker(&ot)
617 .remove(y);
618 }
619
620 if (r < 0) {
621 return r;
622 }
623
624 return 0;
625 }
626
627 rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const
628 {
629 rgw_bucket b = bucket;
630 b.bucket_id.clear();
631 return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
632 bucket_sync_sources_oid_prefix + "." + b.get_key());
633 }
634
635 rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const
636 {
637 rgw_bucket b = bucket;
638 b.bucket_id.clear();
639 return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
640 bucket_sync_targets_oid_prefix + "." + b.get_key());
641 }
642
643 template <typename C1, typename C2>
644 int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const RGWBucketInfo& bucket_info,
645 C1& added_dests,
646 C2& removed_dests,
647 C1& added_sources,
648 C2& removed_sources,
649 optional_yield y)
650 {
651 C1 self_entity = { bucket_info.bucket };
652
653 if (!added_dests.empty() ||
654 !removed_dests.empty()) {
655 /* update our dests */
656 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
657 get_dests_obj(bucket_info.bucket));
658 int r = index.update(bucket_info.bucket,
659 bucket_info,
660 &added_dests,
661 &removed_dests,
662 y);
663 if (r < 0) {
664 ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
665 return r;
666 }
667
668 /* update dest buckets */
669 for (auto& dest_bucket : added_dests) {
670 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
671 get_sources_obj(dest_bucket));
672 int r = dep_index.update(dest_bucket,
673 bucket_info,
674 &self_entity,
675 static_cast<C2 *>(nullptr),
676 y);
677 if (r < 0) {
678 ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
679 return r;
680 }
681 }
682 /* update removed dest buckets */
683 for (auto& dest_bucket : removed_dests) {
684 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
685 get_sources_obj(dest_bucket));
686 int r = dep_index.update(dest_bucket,
687 bucket_info,
688 static_cast<C1 *>(nullptr),
689 &self_entity,
690 y);
691 if (r < 0) {
692 ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
693 return r;
694 }
695 }
696 }
697
698 if (!added_sources.empty() ||
699 !removed_sources.empty()) {
700 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
701 get_sources_obj(bucket_info.bucket));
702 /* update our sources */
703 int r = index.update(bucket_info.bucket,
704 bucket_info,
705 &added_sources,
706 &removed_sources,
707 y);
708 if (r < 0) {
709 ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
710 return r;
711 }
712
713 /* update added sources buckets */
714 for (auto& source_bucket : added_sources) {
715 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
716 get_dests_obj(source_bucket));
717 int r = dep_index.update(source_bucket,
718 bucket_info,
719 &self_entity,
720 static_cast<C2 *>(nullptr),
721 y);
722 if (r < 0) {
723 ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
724 return r;
725 }
726 }
727 /* update removed dest buckets */
728 for (auto& source_bucket : removed_sources) {
729 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
730 get_dests_obj(source_bucket));
731 int r = dep_index.update(source_bucket,
732 bucket_info,
733 static_cast<C1 *>(nullptr),
734 &self_entity,
735 y);
736 if (r < 0) {
737 ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
738 return r;
739 }
740 }
741 }
742
743 return 0;
744 }
745
746 int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info,
747 optional_yield y)
748 {
749 std::set<rgw_bucket> sources_set;
750 std::set<rgw_bucket> dests_set;
751
752 if (bucket_info.sync_policy) {
753 bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
754 &sources_set,
755 &dests_set);
756 }
757
758 std::vector<rgw_bucket> removed_sources;
759 removed_sources.reserve(sources_set.size());
760 for (auto& e : sources_set) {
761 removed_sources.push_back(e);
762 }
763
764 std::vector<rgw_bucket> removed_dests;
765 removed_dests.reserve(dests_set.size());
766 for (auto& e : dests_set) {
767 removed_dests.push_back(e);
768 }
769
770 std::vector<rgw_bucket> added_sources;
771 std::vector<rgw_bucket> added_dests;
772
773 return hint_index_mgr->update_hints(bucket_info,
774 added_dests,
775 removed_dests,
776 added_sources,
777 removed_sources,
778 y);
779 }
780
781 int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
782 RGWBucketInfo *orig_bucket_info,
783 optional_yield y)
784 {
785 std::set<rgw_bucket> orig_sources;
786 std::set<rgw_bucket> orig_dests;
787
788 if (orig_bucket_info &&
789 orig_bucket_info->sync_policy) {
790 orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket,
791 &orig_sources,
792 &orig_dests);
793 }
794
795 std::set<rgw_bucket> sources;
796 std::set<rgw_bucket> dests;
797 if (bucket_info.sync_policy) {
798 bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
799 &sources,
800 &dests);
801 }
802
803 std::vector<rgw_bucket> removed_sources;
804 std::vector<rgw_bucket> added_sources;
805 bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources);
806 ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl;
807 ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl;
808
809 std::vector<rgw_bucket> removed_dests;
810 std::vector<rgw_bucket> added_dests;
811 found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests);
812
813 ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl;
814 ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl;
815
816 if (!found) {
817 return 0;
818 }
819
820 return hint_index_mgr->update_hints(bucket_info,
821 dests, /* set all dests, not just the ones that were added */
822 removed_dests,
823 sources, /* set all sources, not just that the ones that were added */
824 removed_sources,
825 y);
826 }
827
828 int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket,
829 std::set<rgw_bucket> *sources,
830 std::set<rgw_bucket> *dests,
831 optional_yield y)
832 {
833 if (!sources && !dests) {
834 return 0;
835 }
836
837 if (sources) {
838 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
839 hint_index_mgr->get_sources_obj(bucket));
840 int r = index.read(y);
841 if (r < 0) {
842 ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl;
843 return r;
844 }
845
846 index.get_entities(bucket, sources);
847
848 if (!bucket.bucket_id.empty()) {
849 rgw_bucket b = bucket;
850 b.bucket_id.clear();
851 index.get_entities(b, sources);
852 }
853 }
854
855 if (dests) {
856 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
857 hint_index_mgr->get_dests_obj(bucket));
858 int r = index.read(y);
859 if (r < 0) {
860 ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl;
861 return r;
862 }
863
864 index.get_entities(bucket, dests);
865
866 if (!bucket.bucket_id.empty()) {
867 rgw_bucket b = bucket;
868 b.bucket_id.clear();
869 index.get_entities(b, dests);
870 }
871 }
872
873 return 0;
874 }