]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_bucket_sync_sobj.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / services / svc_bucket_sync_sobj.cc
CommitLineData
9f95a23c
TL
1#include "svc_bucket_sync_sobj.h"
2#include "svc_zone.h"
3#include "svc_sys_obj_cache.h"
4#include "svc_bucket_sobj.h"
5
1e59de90
TL
6#include "rgw_bucket_sync.h"
7#include "rgw_zone.h"
8#include "rgw_sync_policy.h"
9f95a23c
TL
9
10#define dout_subsys ceph_subsys_rgw
11
20effc67
TL
12using namespace std;
13
9f95a23c
TL
14static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints";
15static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints";
16
17class RGWSI_Bucket_Sync_SObj_HintIndexManager {
18 CephContext *cct;
19
20 struct {
21 RGWSI_Zone *zone;
22 RGWSI_SysObj *sysobj;
23 } svc;
24
25public:
26 RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone *_zone_svc,
27 RGWSI_SysObj *_sysobj_svc) {
28 svc.zone = _zone_svc;
29 svc.sysobj = _sysobj_svc;
30
31 cct = svc.zone->ctx();
32 }
33
34 rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const;
35 rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const;
36
37 template <typename C1, typename C2>
b3b6e05e
TL
38 int update_hints(const DoutPrefixProvider *dpp,
39 const RGWBucketInfo& bucket_info,
9f95a23c
TL
40 C1& added_dests,
41 C2& removed_dests,
42 C1& added_sources,
43 C2& removed_sources,
44 optional_yield y);
45};
46
47RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {
48}
49RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
50}
51
52void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
53 RGWSI_SysObj *_sysobj_svc,
54 RGWSI_SysObj_Cache *_cache_svc,
55 RGWSI_Bucket_SObj *bucket_sobj_svc)
56{
57 svc.zone = _zone_svc;
58 svc.sysobj = _sysobj_svc;
59 svc.cache = _cache_svc;
60 svc.bucket_sobj = bucket_sobj_svc;
61
62 hint_index_mgr.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc.zone, svc.sysobj));
63}
64
b3b6e05e 65int RGWSI_Bucket_Sync_SObj::do_start(optional_yield, const DoutPrefixProvider *dpp)
9f95a23c
TL
66{
67 sync_policy_cache.reset(new RGWChainedCacheImpl<bucket_sync_policy_cache_entry>);
68 sync_policy_cache->init(svc.cache);
69
70 return 0;
71}
72
73void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx,
74 const std::set<rgw_zone_id>& zones,
75 const std::set<rgw_bucket>& buckets,
76 std::set<rgw_sync_bucket_entity> *hint_entities,
b3b6e05e 77 optional_yield y, const DoutPrefixProvider *dpp)
9f95a23c
TL
78{
79 vector<rgw_bucket> hint_buckets;
80
81 hint_buckets.reserve(buckets.size());
82
83 for (auto& b : buckets) {
84 RGWBucketInfo hint_bucket_info;
85 int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info,
86 nullptr, nullptr, boost::none,
b3b6e05e 87 y, dpp);
9f95a23c 88 if (ret < 0) {
b3b6e05e 89 ldpp_dout(dpp, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
9f95a23c
TL
90 continue;
91 }
92
93 hint_buckets.emplace_back(std::move(hint_bucket_info.bucket));
94 }
95
96 for (auto& zone : zones) {
97 for (auto& b : hint_buckets) {
98 hint_entities->insert(rgw_sync_bucket_entity(zone, b));
99 }
100 }
101}
102
103int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx,
104 rgw_sync_bucket_entity& self_entity,
105 RGWBucketSyncPolicyHandlerRef& handler,
106 RGWBucketSyncPolicyHandlerRef& zone_policy_handler,
107 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
b3b6e05e
TL
108 optional_yield y,
109 const DoutPrefixProvider *dpp)
9f95a23c
TL
110{
111 set<rgw_zone_id> source_zones;
112 set<rgw_zone_id> target_zones;
113
20effc67 114 zone_policy_handler->reflect(dpp, nullptr, nullptr,
9f95a23c
TL
115 nullptr, nullptr,
116 &source_zones,
117 &target_zones,
118 false); /* relaxed: also get all zones that we allow to sync to/from */
119
120 std::set<rgw_sync_bucket_entity> hint_entities;
121
b3b6e05e
TL
122 get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y, dpp);
123 get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y, dpp);
9f95a23c
TL
124
125 std::set<rgw_sync_bucket_pipe> resolved_sources;
126 std::set<rgw_sync_bucket_pipe> resolved_dests;
127
128 for (auto& hint_entity : hint_entities) {
129 if (!hint_entity.zone ||
130 !hint_entity.bucket) {
131 continue; /* shouldn't really happen */
132 }
133
134 auto& zid = *hint_entity.zone;
135 auto& hint_bucket = *hint_entity.bucket;
136
137 RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
138
139 auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket));
140 if (iter != temp_map.end()) {
141 hint_bucket_handler = iter->second;
142 } else {
b3b6e05e 143 int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y, dpp);
9f95a23c 144 if (r < 0) {
b3b6e05e 145 ldpp_dout(dpp, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl;
9f95a23c
TL
146 continue;
147 }
148 }
149
150 hint_bucket_handler->get_pipes(&resolved_dests,
151 &resolved_sources,
152 self_entity); /* flipping resolved dests and sources as these are
153 relative to the remote entity */
154 }
155
156 handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests));
157
158 return 0;
159}
160
161int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
162 std::optional<rgw_zone_id> zone,
163 std::optional<rgw_bucket> _bucket,
164 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
165 RGWBucketSyncPolicyHandlerRef *handler,
b3b6e05e
TL
166 optional_yield y,
167 const DoutPrefixProvider *dpp)
9f95a23c
TL
168{
169 if (!_bucket) {
170 *handler = svc.zone->get_sync_policy_handler(zone);
171 return 0;
172 }
173
1e59de90
TL
174 auto bucket = *_bucket;
175
176 if (bucket.bucket_id.empty()) {
177 RGWBucketEntryPoint ep_info;
178 int ret = svc.bucket_sobj->read_bucket_entrypoint_info(ctx.ep,
179 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
180 &ep_info,
181 nullptr, /* objv_tracker */
182 nullptr, /* mtime */
183 nullptr, /* attrs */
184 y,
185 dpp,
186 nullptr, /* cache_info */
187 boost::none /* refresh_version */);
188 if (ret < 0) {
189 if (ret != -ENOENT) {
190 ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_info(bucket=" << bucket << ") returned r=" << ret << dendl;
191 }
192 return ret;
193 }
194
195 bucket = ep_info.bucket;
196 }
9f95a23c
TL
197
198 string zone_key;
199 string bucket_key;
200
201 if (zone && *zone != svc.zone->zone_id()) {
202 zone_key = zone->id;
203 }
204
205 bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket);
206
207 string cache_key("bi/" + zone_key + "/" + bucket_key);
208
209 if (auto e = sync_policy_cache->find(cache_key)) {
210 *handler = e->handler;
211 return 0;
212 }
213
214 bucket_sync_policy_cache_entry e;
215 rgw_cache_entry_info cache_info;
216
217 RGWBucketInfo bucket_info;
218 map<string, bufferlist> attrs;
219
220 int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi,
221 bucket_key,
222 &bucket_info,
223 nullptr,
224 &attrs,
225 y,
b3b6e05e 226 dpp,
9f95a23c
TL
227 &cache_info);
228 if (r < 0) {
229 if (r != -ENOENT) {
b3b6e05e 230 ldpp_dout(dpp, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl;
9f95a23c
TL
231 }
232 return r;
233 }
234
235 auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone);
236 if (!zone_policy_handler) {
b3b6e05e 237 ldpp_dout(dpp, 20) << "ERROR: could not find policy handler for zone=" << zone << dendl;
9f95a23c
TL
238 return -ENOENT;
239 }
240
241 e.handler.reset(zone_policy_handler->alloc_child(bucket_info, std::move(attrs)));
242
b3b6e05e 243 r = e.handler->init(dpp, y);
9f95a23c 244 if (r < 0) {
b3b6e05e 245 ldpp_dout(dpp, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl;
9f95a23c
TL
246 return r;
247 }
248
249 temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler);
250
251 rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket);
252
253 r = resolve_policy_hints(ctx, self_entity,
254 e.handler,
255 zone_policy_handler,
b3b6e05e 256 temp_map, y, dpp);
9f95a23c 257 if (r < 0) {
b3b6e05e 258 ldpp_dout(dpp, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl;
9f95a23c
TL
259 return r;
260 }
261
b3b6e05e
TL
262 if (!sync_policy_cache->put(dpp, svc.cache, cache_key, &e, {&cache_info})) {
263 ldpp_dout(dpp, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
9f95a23c
TL
264 }
265
266 *handler = e.handler;
267
268 return 0;
269}
270
271int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
272 std::optional<rgw_zone_id> zone,
273 std::optional<rgw_bucket> _bucket,
274 RGWBucketSyncPolicyHandlerRef *handler,
b3b6e05e
TL
275 optional_yield y,
276 const DoutPrefixProvider *dpp)
9f95a23c
TL
277{
278 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef> temp_map;
b3b6e05e 279 return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y, dpp);
9f95a23c
TL
280}
281
282static bool diff_sets(std::set<rgw_bucket>& orig_set,
283 std::set<rgw_bucket>& new_set,
284 vector<rgw_bucket> *added,
285 vector<rgw_bucket> *removed)
286{
287 auto oiter = orig_set.begin();
288 auto niter = new_set.begin();
289
290 while (oiter != orig_set.end() &&
291 niter != new_set.end()) {
292 if (*oiter == *niter) {
293 ++oiter;
294 ++niter;
295 continue;
1e59de90 296 } else if (*oiter < *niter) {
9f95a23c
TL
297 removed->push_back(*oiter);
298 ++oiter;
1e59de90 299 } else {
9f95a23c
TL
300 added->push_back(*niter);
301 ++niter;
302 }
303 }
304 for (; oiter != orig_set.end(); ++oiter) {
305 removed->push_back(*oiter);
306 }
307 for (; niter != new_set.end(); ++niter) {
308 added->push_back(*niter);
309 }
310
311 return !(removed->empty() && added->empty());
312}
313
314
315class RGWSI_BS_SObj_HintIndexObj
316{
317 friend class RGWSI_Bucket_Sync_SObj;
318
319 CephContext *cct;
320 struct {
321 RGWSI_SysObj *sysobj;
322 } svc;
323
9f95a23c
TL
324 rgw_raw_obj obj;
325 RGWSysObj sysobj;
326
327 RGWObjVersionTracker ot;
328
329 bool has_data{false};
330
331public:
332 struct bi_entry {
333 rgw_bucket bucket;
334 map<rgw_bucket /* info_source */, obj_version> sources;
335
336 void encode(bufferlist& bl) const {
337 ENCODE_START(1, 1, bl);
338 encode(bucket, bl);
339 encode(sources, bl);
340 ENCODE_FINISH(bl);
341 }
342
343 void decode(bufferlist::const_iterator& bl) {
344 DECODE_START(1, bl);
345 decode(bucket, bl);
346 decode(sources, bl);
347 DECODE_FINISH(bl);
348 }
349
350 bool add(const rgw_bucket& info_source,
351 const obj_version& info_source_ver) {
352 auto& ver = sources[info_source];
353
354 if (ver == info_source_ver) { /* already updated */
355 return false;
356 }
357
358 if (info_source_ver.tag == ver.tag &&
359 info_source_ver.ver < ver.ver) {
360 return false;
361 }
362
363 ver = info_source_ver;
364
365 return true;
366 }
367
368 bool remove(const rgw_bucket& info_source,
369 const obj_version& info_source_ver) {
370 auto iter = sources.find(info_source);
371 if (iter == sources.end()) {
372 return false;
373 }
374
375 auto& ver = iter->second;
376
377 if (info_source_ver.tag == ver.tag &&
378 info_source_ver.ver < ver.ver) {
379 return false;
380 }
381
382 sources.erase(info_source);
383 return true;
384 }
385
386 bool empty() const {
387 return sources.empty();
388 }
389 };
390
391 struct single_instance_info {
392 map<rgw_bucket, bi_entry> entries;
393
394 void encode(bufferlist& bl) const {
395 ENCODE_START(1, 1, bl);
396 encode(entries, bl);
397 ENCODE_FINISH(bl);
398 }
399
400 void decode(bufferlist::const_iterator& bl) {
401 DECODE_START(1, bl);
402 decode(entries, bl);
403 DECODE_FINISH(bl);
404 }
405
406 bool add_entry(const rgw_bucket& info_source,
407 const obj_version& info_source_ver,
408 const rgw_bucket& bucket) {
409 auto& entry = entries[bucket];
410
411 if (!entry.add(info_source, info_source_ver)) {
412 return false;
413 }
414
415 entry.bucket = bucket;
416
417 return true;
418 }
419
420 bool remove_entry(const rgw_bucket& info_source,
421 const obj_version& info_source_ver,
422 const rgw_bucket& bucket) {
423 auto iter = entries.find(bucket);
424 if (iter == entries.end()) {
425 return false;
426 }
427
428 if (!iter->second.remove(info_source, info_source_ver)) {
429 return false;
430 }
431
432 if (iter->second.empty()) {
433 entries.erase(iter);
434 }
435
436 return true;
437 }
438
439 void clear() {
440 entries.clear();
441 }
442
443 bool empty() const {
444 return entries.empty();
445 }
446
447 void get_entities(std::set<rgw_bucket> *result) const {
448 for (auto& iter : entries) {
449 result->insert(iter.first);
450 }
451 }
452 };
453
454 struct info_map {
455 map<rgw_bucket, single_instance_info> instances;
456
457 void encode(bufferlist& bl) const {
458 ENCODE_START(1, 1, bl);
459 encode(instances, bl);
460 ENCODE_FINISH(bl);
461 }
462
463 void decode(bufferlist::const_iterator& bl) {
464 DECODE_START(1, bl);
465 decode(instances, bl);
466 DECODE_FINISH(bl);
467 }
468
469 bool empty() const {
470 return instances.empty();
471 }
472
473 void clear() {
474 instances.clear();
475 }
476
477 void get_entities(const rgw_bucket& bucket,
478 std::set<rgw_bucket> *result) const {
479 auto iter = instances.find(bucket);
480 if (iter == instances.end()) {
481 return;
482 }
483 iter->second.get_entities(result);
484 }
485 } info;
486
487 RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc,
488 const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()),
9f95a23c 489 obj(_obj),
1e59de90 490 sysobj(_sysobj_svc->get_obj(obj))
9f95a23c
TL
491 {
492 svc.sysobj = _sysobj_svc;
493 }
494
495 template <typename C1, typename C2>
b3b6e05e
TL
496 int update(const DoutPrefixProvider *dpp,
497 const rgw_bucket& entity,
9f95a23c
TL
498 const RGWBucketInfo& info_source,
499 C1 *add,
500 C2 *remove,
501 optional_yield y);
502
503private:
504 template <typename C1, typename C2>
505 void update_entries(const rgw_bucket& info_source,
506 const obj_version& info_source_ver,
507 C1 *add,
508 C2 *remove,
509 single_instance_info *instance);
510
b3b6e05e
TL
511 int read(const DoutPrefixProvider *dpp, optional_yield y);
512 int flush(const DoutPrefixProvider *dpp, optional_yield y);
9f95a23c
TL
513
514 void invalidate() {
515 has_data = false;
516 info.clear();
517 }
518
519 void get_entities(const rgw_bucket& bucket,
520 std::set<rgw_bucket> *result) const {
521 info.get_entities(bucket, result);
522 }
523};
524WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry)
525WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info)
526WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map)
527
528template <typename C1, typename C2>
b3b6e05e
TL
529int RGWSI_BS_SObj_HintIndexObj::update(const DoutPrefixProvider *dpp,
530 const rgw_bucket& entity,
9f95a23c
TL
531 const RGWBucketInfo& info_source,
532 C1 *add,
533 C2 *remove,
534 optional_yield y)
535{
536 int r = 0;
537
538 auto& info_source_ver = info_source.objv_tracker.read_version;
539
540#define MAX_RETRIES 25
541
542 for (int i = 0; i < MAX_RETRIES; ++i) {
543 if (!has_data) {
b3b6e05e 544 r = read(dpp, y);
9f95a23c 545 if (r < 0) {
b3b6e05e 546 ldpp_dout(dpp, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl;
9f95a23c
TL
547 return r;
548 }
549 }
550
551 auto& instance = info.instances[entity];
552
553 update_entries(info_source.bucket,
554 info_source_ver,
555 add, remove,
556 &instance);
557
558 if (instance.empty()) {
559 info.instances.erase(entity);
560 }
561
b3b6e05e 562 r = flush(dpp, y);
9f95a23c
TL
563 if (r >= 0) {
564 return 0;
565 }
566
567 if (r != -ECANCELED) {
b3b6e05e 568 ldpp_dout(dpp, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl;
9f95a23c
TL
569 return r;
570 }
571
572 invalidate();
573 }
b3b6e05e 574 ldpp_dout(dpp, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl;
9f95a23c
TL
575
576 return -EIO;
577}
578
579template <typename C1, typename C2>
580void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source,
581 const obj_version& info_source_ver,
582 C1 *add,
583 C2 *remove,
584 single_instance_info *instance)
585{
586 if (remove) {
587 for (auto& bucket : *remove) {
588 instance->remove_entry(info_source, info_source_ver, bucket);
589 }
590 }
591
592 if (add) {
593 for (auto& bucket : *add) {
594 instance->add_entry(info_source, info_source_ver, bucket);
595 }
596 }
597}
598
b3b6e05e 599int RGWSI_BS_SObj_HintIndexObj::read(const DoutPrefixProvider *dpp, optional_yield y) {
9f95a23c
TL
600 RGWObjVersionTracker _ot;
601 bufferlist bl;
602 int r = sysobj.rop()
603 .set_objv_tracker(&_ot) /* forcing read of current version */
b3b6e05e 604 .read(dpp, &bl, y);
9f95a23c 605 if (r < 0 && r != -ENOENT) {
b3b6e05e 606 ldpp_dout(dpp, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl;
9f95a23c
TL
607 return r;
608 }
609
610 ot = _ot;
611
612 if (r >= 0) {
613 auto iter = bl.cbegin();
614 try {
615 decode(info, iter);
616 has_data = true;
617 } catch (buffer::error& err) {
b3b6e05e 618 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl;
9f95a23c
TL
619 info.clear();
620 }
621 } else {
622 info.clear();
623 }
624
625 return 0;
626}
627
b3b6e05e 628int RGWSI_BS_SObj_HintIndexObj::flush(const DoutPrefixProvider *dpp, optional_yield y) {
9f95a23c
TL
629 int r;
630
631 if (!info.empty()) {
632 bufferlist bl;
633 encode(info, bl);
634
635 r = sysobj.wop()
636 .set_objv_tracker(&ot) /* forcing read of current version */
b3b6e05e 637 .write(dpp, bl, y);
9f95a23c
TL
638
639 } else { /* remove */
640 r = sysobj.wop()
641 .set_objv_tracker(&ot)
b3b6e05e 642 .remove(dpp, y);
9f95a23c
TL
643 }
644
645 if (r < 0) {
646 return r;
647 }
648
649 return 0;
650}
651
652rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const
653{
654 rgw_bucket b = bucket;
655 b.bucket_id.clear();
656 return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
657 bucket_sync_sources_oid_prefix + "." + b.get_key());
658}
659
660rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const
661{
662 rgw_bucket b = bucket;
663 b.bucket_id.clear();
664 return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
665 bucket_sync_targets_oid_prefix + "." + b.get_key());
666}
667
668template <typename C1, typename C2>
b3b6e05e
TL
669int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const DoutPrefixProvider *dpp,
670 const RGWBucketInfo& bucket_info,
9f95a23c
TL
671 C1& added_dests,
672 C2& removed_dests,
673 C1& added_sources,
674 C2& removed_sources,
675 optional_yield y)
676{
677 C1 self_entity = { bucket_info.bucket };
678
679 if (!added_dests.empty() ||
680 !removed_dests.empty()) {
681 /* update our dests */
682 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
683 get_dests_obj(bucket_info.bucket));
b3b6e05e 684 int r = index.update(dpp, bucket_info.bucket,
9f95a23c
TL
685 bucket_info,
686 &added_dests,
687 &removed_dests,
688 y);
689 if (r < 0) {
b3b6e05e 690 ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
9f95a23c
TL
691 return r;
692 }
693
694 /* update dest buckets */
695 for (auto& dest_bucket : added_dests) {
696 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
697 get_sources_obj(dest_bucket));
b3b6e05e 698 int r = dep_index.update(dpp, dest_bucket,
9f95a23c
TL
699 bucket_info,
700 &self_entity,
701 static_cast<C2 *>(nullptr),
702 y);
703 if (r < 0) {
b3b6e05e 704 ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
9f95a23c
TL
705 return r;
706 }
707 }
708 /* update removed dest buckets */
709 for (auto& dest_bucket : removed_dests) {
710 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
711 get_sources_obj(dest_bucket));
b3b6e05e 712 int r = dep_index.update(dpp, dest_bucket,
9f95a23c
TL
713 bucket_info,
714 static_cast<C1 *>(nullptr),
715 &self_entity,
716 y);
717 if (r < 0) {
b3b6e05e 718 ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
9f95a23c
TL
719 return r;
720 }
721 }
722 }
723
724 if (!added_sources.empty() ||
725 !removed_sources.empty()) {
726 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
727 get_sources_obj(bucket_info.bucket));
728 /* update our sources */
b3b6e05e 729 int r = index.update(dpp, bucket_info.bucket,
9f95a23c
TL
730 bucket_info,
731 &added_sources,
732 &removed_sources,
733 y);
734 if (r < 0) {
b3b6e05e 735 ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
9f95a23c
TL
736 return r;
737 }
738
739 /* update added sources buckets */
740 for (auto& source_bucket : added_sources) {
741 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
742 get_dests_obj(source_bucket));
b3b6e05e 743 int r = dep_index.update(dpp, source_bucket,
9f95a23c
TL
744 bucket_info,
745 &self_entity,
746 static_cast<C2 *>(nullptr),
747 y);
748 if (r < 0) {
b3b6e05e 749 ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
9f95a23c
TL
750 return r;
751 }
752 }
753 /* update removed dest buckets */
754 for (auto& source_bucket : removed_sources) {
755 RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
756 get_dests_obj(source_bucket));
b3b6e05e 757 int r = dep_index.update(dpp, source_bucket,
9f95a23c
TL
758 bucket_info,
759 static_cast<C1 *>(nullptr),
760 &self_entity,
761 y);
762 if (r < 0) {
b3b6e05e 763 ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
9f95a23c
TL
764 return r;
765 }
766 }
767 }
768
769 return 0;
770}
771
b3b6e05e
TL
772int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const DoutPrefixProvider *dpp,
773 const RGWBucketInfo& bucket_info,
9f95a23c
TL
774 optional_yield y)
775{
776 std::set<rgw_bucket> sources_set;
777 std::set<rgw_bucket> dests_set;
778
779 if (bucket_info.sync_policy) {
780 bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
781 &sources_set,
782 &dests_set);
783 }
784
785 std::vector<rgw_bucket> removed_sources;
786 removed_sources.reserve(sources_set.size());
787 for (auto& e : sources_set) {
788 removed_sources.push_back(e);
789 }
790
791 std::vector<rgw_bucket> removed_dests;
792 removed_dests.reserve(dests_set.size());
793 for (auto& e : dests_set) {
794 removed_dests.push_back(e);
795 }
796
797 std::vector<rgw_bucket> added_sources;
798 std::vector<rgw_bucket> added_dests;
799
b3b6e05e 800 return hint_index_mgr->update_hints(dpp, bucket_info,
9f95a23c
TL
801 added_dests,
802 removed_dests,
803 added_sources,
804 removed_sources,
805 y);
806}
807
b3b6e05e
TL
808int RGWSI_Bucket_Sync_SObj::handle_bi_update(const DoutPrefixProvider *dpp,
809 RGWBucketInfo& bucket_info,
9f95a23c
TL
810 RGWBucketInfo *orig_bucket_info,
811 optional_yield y)
812{
813 std::set<rgw_bucket> orig_sources;
814 std::set<rgw_bucket> orig_dests;
815
816 if (orig_bucket_info &&
817 orig_bucket_info->sync_policy) {
818 orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket,
819 &orig_sources,
820 &orig_dests);
821 }
822
823 std::set<rgw_bucket> sources;
824 std::set<rgw_bucket> dests;
825 if (bucket_info.sync_policy) {
826 bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
827 &sources,
828 &dests);
829 }
830
831 std::vector<rgw_bucket> removed_sources;
832 std::vector<rgw_bucket> added_sources;
833 bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources);
b3b6e05e
TL
834 ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl;
835 ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl;
9f95a23c
TL
836
837 std::vector<rgw_bucket> removed_dests;
838 std::vector<rgw_bucket> added_dests;
839 found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests);
840
b3b6e05e
TL
841 ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl;
842 ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl;
9f95a23c
TL
843
844 if (!found) {
845 return 0;
846 }
847
b3b6e05e 848 return hint_index_mgr->update_hints(dpp, bucket_info,
9f95a23c
TL
849 dests, /* set all dests, not just the ones that were added */
850 removed_dests,
851 sources, /* set all sources, not just that the ones that were added */
852 removed_sources,
853 y);
854}
855
b3b6e05e
TL
856int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const DoutPrefixProvider *dpp,
857 const rgw_bucket& bucket,
9f95a23c
TL
858 std::set<rgw_bucket> *sources,
859 std::set<rgw_bucket> *dests,
860 optional_yield y)
861{
862 if (!sources && !dests) {
863 return 0;
864 }
865
866 if (sources) {
867 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
868 hint_index_mgr->get_sources_obj(bucket));
b3b6e05e 869 int r = index.read(dpp, y);
9f95a23c 870 if (r < 0) {
b3b6e05e 871 ldpp_dout(dpp, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl;
9f95a23c
TL
872 return r;
873 }
874
875 index.get_entities(bucket, sources);
876
877 if (!bucket.bucket_id.empty()) {
878 rgw_bucket b = bucket;
879 b.bucket_id.clear();
880 index.get_entities(b, sources);
881 }
882 }
883
884 if (dests) {
885 RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
886 hint_index_mgr->get_dests_obj(bucket));
b3b6e05e 887 int r = index.read(dpp, y);
9f95a23c 888 if (r < 0) {
b3b6e05e 889 ldpp_dout(dpp, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl;
9f95a23c
TL
890 return r;
891 }
892
893 index.get_entities(bucket, dests);
894
895 if (!bucket.bucket_id.empty()) {
896 rgw_bucket b = bucket;
897 b.bucket_id.clear();
898 index.get_entities(b, dests);
899 }
900 }
901
902 return 0;
903}