]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | #include "svc_bucket_sync_sobj.h" |
2 | #include "svc_zone.h" | |
3 | #include "svc_sys_obj_cache.h" | |
4 | #include "svc_bucket_sobj.h" | |
5 | ||
6 | #include "rgw/rgw_bucket_sync.h" | |
7 | #include "rgw/rgw_zone.h" | |
8 | #include "rgw/rgw_sync_policy.h" | |
9 | ||
10 | #define dout_subsys ceph_subsys_rgw | |
11 | ||
12 | static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints"; | |
13 | static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints"; | |
14 | ||
15 | class RGWSI_Bucket_Sync_SObj_HintIndexManager { | |
16 | CephContext *cct; | |
17 | ||
18 | struct { | |
19 | RGWSI_Zone *zone; | |
20 | RGWSI_SysObj *sysobj; | |
21 | } svc; | |
22 | ||
23 | public: | |
24 | RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone *_zone_svc, | |
25 | RGWSI_SysObj *_sysobj_svc) { | |
26 | svc.zone = _zone_svc; | |
27 | svc.sysobj = _sysobj_svc; | |
28 | ||
29 | cct = svc.zone->ctx(); | |
30 | } | |
31 | ||
32 | rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const; | |
33 | rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const; | |
34 | ||
35 | template <typename C1, typename C2> | |
36 | int update_hints(const RGWBucketInfo& bucket_info, | |
37 | C1& added_dests, | |
38 | C2& removed_dests, | |
39 | C1& added_sources, | |
40 | C2& removed_sources, | |
41 | optional_yield y); | |
42 | }; | |
43 | ||
44 | RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) { | |
45 | } | |
46 | RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() { | |
47 | } | |
48 | ||
49 | void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc, | |
50 | RGWSI_SysObj *_sysobj_svc, | |
51 | RGWSI_SysObj_Cache *_cache_svc, | |
52 | RGWSI_Bucket_SObj *bucket_sobj_svc) | |
53 | { | |
54 | svc.zone = _zone_svc; | |
55 | svc.sysobj = _sysobj_svc; | |
56 | svc.cache = _cache_svc; | |
57 | svc.bucket_sobj = bucket_sobj_svc; | |
58 | ||
59 | hint_index_mgr.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc.zone, svc.sysobj)); | |
60 | } | |
61 | ||
f67539c2 | 62 | int RGWSI_Bucket_Sync_SObj::do_start(optional_yield) |
9f95a23c TL |
63 | { |
64 | sync_policy_cache.reset(new RGWChainedCacheImpl<bucket_sync_policy_cache_entry>); | |
65 | sync_policy_cache->init(svc.cache); | |
66 | ||
67 | return 0; | |
68 | } | |
69 | ||
70 | void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx, | |
71 | const std::set<rgw_zone_id>& zones, | |
72 | const std::set<rgw_bucket>& buckets, | |
73 | std::set<rgw_sync_bucket_entity> *hint_entities, | |
74 | optional_yield y) | |
75 | { | |
76 | vector<rgw_bucket> hint_buckets; | |
77 | ||
78 | hint_buckets.reserve(buckets.size()); | |
79 | ||
80 | for (auto& b : buckets) { | |
81 | RGWBucketInfo hint_bucket_info; | |
82 | int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info, | |
83 | nullptr, nullptr, boost::none, | |
84 | y); | |
85 | if (ret < 0) { | |
86 | ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl; | |
87 | continue; | |
88 | } | |
89 | ||
90 | hint_buckets.emplace_back(std::move(hint_bucket_info.bucket)); | |
91 | } | |
92 | ||
93 | for (auto& zone : zones) { | |
94 | for (auto& b : hint_buckets) { | |
95 | hint_entities->insert(rgw_sync_bucket_entity(zone, b)); | |
96 | } | |
97 | } | |
98 | } | |
99 | ||
100 | int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx, | |
101 | rgw_sync_bucket_entity& self_entity, | |
102 | RGWBucketSyncPolicyHandlerRef& handler, | |
103 | RGWBucketSyncPolicyHandlerRef& zone_policy_handler, | |
104 | std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map, | |
105 | optional_yield y) | |
106 | { | |
107 | set<rgw_zone_id> source_zones; | |
108 | set<rgw_zone_id> target_zones; | |
109 | ||
110 | zone_policy_handler->reflect(nullptr, nullptr, | |
111 | nullptr, nullptr, | |
112 | &source_zones, | |
113 | &target_zones, | |
114 | false); /* relaxed: also get all zones that we allow to sync to/from */ | |
115 | ||
116 | std::set<rgw_sync_bucket_entity> hint_entities; | |
117 | ||
118 | get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y); | |
119 | get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y); | |
120 | ||
121 | std::set<rgw_sync_bucket_pipe> resolved_sources; | |
122 | std::set<rgw_sync_bucket_pipe> resolved_dests; | |
123 | ||
124 | for (auto& hint_entity : hint_entities) { | |
125 | if (!hint_entity.zone || | |
126 | !hint_entity.bucket) { | |
127 | continue; /* shouldn't really happen */ | |
128 | } | |
129 | ||
130 | auto& zid = *hint_entity.zone; | |
131 | auto& hint_bucket = *hint_entity.bucket; | |
132 | ||
133 | RGWBucketSyncPolicyHandlerRef hint_bucket_handler; | |
134 | ||
135 | auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket)); | |
136 | if (iter != temp_map.end()) { | |
137 | hint_bucket_handler = iter->second; | |
138 | } else { | |
139 | int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y); | |
140 | if (r < 0) { | |
141 | ldout(cct, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl; | |
142 | continue; | |
143 | } | |
144 | } | |
145 | ||
146 | hint_bucket_handler->get_pipes(&resolved_dests, | |
147 | &resolved_sources, | |
148 | self_entity); /* flipping resolved dests and sources as these are | |
149 | relative to the remote entity */ | |
150 | } | |
151 | ||
152 | handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests)); | |
153 | ||
154 | return 0; | |
155 | } | |
156 | ||
157 | int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, | |
158 | std::optional<rgw_zone_id> zone, | |
159 | std::optional<rgw_bucket> _bucket, | |
160 | std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map, | |
161 | RGWBucketSyncPolicyHandlerRef *handler, | |
162 | optional_yield y) | |
163 | { | |
164 | if (!_bucket) { | |
165 | *handler = svc.zone->get_sync_policy_handler(zone); | |
166 | return 0; | |
167 | } | |
168 | ||
169 | auto& bucket = *_bucket; | |
170 | ||
171 | string zone_key; | |
172 | string bucket_key; | |
173 | ||
174 | if (zone && *zone != svc.zone->zone_id()) { | |
175 | zone_key = zone->id; | |
176 | } | |
177 | ||
178 | bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket); | |
179 | ||
180 | string cache_key("bi/" + zone_key + "/" + bucket_key); | |
181 | ||
182 | if (auto e = sync_policy_cache->find(cache_key)) { | |
183 | *handler = e->handler; | |
184 | return 0; | |
185 | } | |
186 | ||
187 | bucket_sync_policy_cache_entry e; | |
188 | rgw_cache_entry_info cache_info; | |
189 | ||
190 | RGWBucketInfo bucket_info; | |
191 | map<string, bufferlist> attrs; | |
192 | ||
193 | int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi, | |
194 | bucket_key, | |
195 | &bucket_info, | |
196 | nullptr, | |
197 | &attrs, | |
198 | y, | |
199 | &cache_info); | |
200 | if (r < 0) { | |
201 | if (r != -ENOENT) { | |
202 | ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl; | |
203 | } | |
204 | return r; | |
205 | } | |
206 | ||
207 | auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone); | |
208 | if (!zone_policy_handler) { | |
209 | ldout(cct, 20) << "ERROR: could not find policy handler for zone=" << zone << dendl; | |
210 | return -ENOENT; | |
211 | } | |
212 | ||
213 | e.handler.reset(zone_policy_handler->alloc_child(bucket_info, std::move(attrs))); | |
214 | ||
215 | r = e.handler->init(y); | |
216 | if (r < 0) { | |
217 | ldout(cct, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl; | |
218 | return r; | |
219 | } | |
220 | ||
221 | temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler); | |
222 | ||
223 | rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket); | |
224 | ||
225 | r = resolve_policy_hints(ctx, self_entity, | |
226 | e.handler, | |
227 | zone_policy_handler, | |
228 | temp_map, y); | |
229 | if (r < 0) { | |
230 | ldout(cct, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl; | |
231 | return r; | |
232 | } | |
233 | ||
234 | if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) { | |
235 | ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; | |
236 | } | |
237 | ||
238 | *handler = e.handler; | |
239 | ||
240 | return 0; | |
241 | } | |
242 | ||
243 | int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, | |
244 | std::optional<rgw_zone_id> zone, | |
245 | std::optional<rgw_bucket> _bucket, | |
246 | RGWBucketSyncPolicyHandlerRef *handler, | |
247 | optional_yield y) | |
248 | { | |
249 | std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef> temp_map; | |
250 | return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y); | |
251 | } | |
252 | ||
253 | static bool diff_sets(std::set<rgw_bucket>& orig_set, | |
254 | std::set<rgw_bucket>& new_set, | |
255 | vector<rgw_bucket> *added, | |
256 | vector<rgw_bucket> *removed) | |
257 | { | |
258 | auto oiter = orig_set.begin(); | |
259 | auto niter = new_set.begin(); | |
260 | ||
261 | while (oiter != orig_set.end() && | |
262 | niter != new_set.end()) { | |
263 | if (*oiter == *niter) { | |
264 | ++oiter; | |
265 | ++niter; | |
266 | continue; | |
267 | } | |
268 | while (*oiter < *niter && | |
269 | oiter != orig_set.end()) { | |
270 | removed->push_back(*oiter); | |
271 | ++oiter; | |
272 | } | |
273 | while (*niter < *oiter | |
274 | && niter != new_set.end()) { | |
275 | added->push_back(*niter); | |
276 | ++niter; | |
277 | } | |
278 | } | |
279 | for (; oiter != orig_set.end(); ++oiter) { | |
280 | removed->push_back(*oiter); | |
281 | } | |
282 | for (; niter != new_set.end(); ++niter) { | |
283 | added->push_back(*niter); | |
284 | } | |
285 | ||
286 | return !(removed->empty() && added->empty()); | |
287 | } | |
288 | ||
289 | ||
290 | class RGWSI_BS_SObj_HintIndexObj | |
291 | { | |
292 | friend class RGWSI_Bucket_Sync_SObj; | |
293 | ||
294 | CephContext *cct; | |
295 | struct { | |
296 | RGWSI_SysObj *sysobj; | |
297 | } svc; | |
298 | ||
299 | RGWSysObjectCtx obj_ctx; | |
300 | rgw_raw_obj obj; | |
301 | RGWSysObj sysobj; | |
302 | ||
303 | RGWObjVersionTracker ot; | |
304 | ||
305 | bool has_data{false}; | |
306 | ||
307 | public: | |
308 | struct bi_entry { | |
309 | rgw_bucket bucket; | |
310 | map<rgw_bucket /* info_source */, obj_version> sources; | |
311 | ||
312 | void encode(bufferlist& bl) const { | |
313 | ENCODE_START(1, 1, bl); | |
314 | encode(bucket, bl); | |
315 | encode(sources, bl); | |
316 | ENCODE_FINISH(bl); | |
317 | } | |
318 | ||
319 | void decode(bufferlist::const_iterator& bl) { | |
320 | DECODE_START(1, bl); | |
321 | decode(bucket, bl); | |
322 | decode(sources, bl); | |
323 | DECODE_FINISH(bl); | |
324 | } | |
325 | ||
326 | bool add(const rgw_bucket& info_source, | |
327 | const obj_version& info_source_ver) { | |
328 | auto& ver = sources[info_source]; | |
329 | ||
330 | if (ver == info_source_ver) { /* already updated */ | |
331 | return false; | |
332 | } | |
333 | ||
334 | if (info_source_ver.tag == ver.tag && | |
335 | info_source_ver.ver < ver.ver) { | |
336 | return false; | |
337 | } | |
338 | ||
339 | ver = info_source_ver; | |
340 | ||
341 | return true; | |
342 | } | |
343 | ||
344 | bool remove(const rgw_bucket& info_source, | |
345 | const obj_version& info_source_ver) { | |
346 | auto iter = sources.find(info_source); | |
347 | if (iter == sources.end()) { | |
348 | return false; | |
349 | } | |
350 | ||
351 | auto& ver = iter->second; | |
352 | ||
353 | if (info_source_ver.tag == ver.tag && | |
354 | info_source_ver.ver < ver.ver) { | |
355 | return false; | |
356 | } | |
357 | ||
358 | sources.erase(info_source); | |
359 | return true; | |
360 | } | |
361 | ||
362 | bool empty() const { | |
363 | return sources.empty(); | |
364 | } | |
365 | }; | |
366 | ||
367 | struct single_instance_info { | |
368 | map<rgw_bucket, bi_entry> entries; | |
369 | ||
370 | void encode(bufferlist& bl) const { | |
371 | ENCODE_START(1, 1, bl); | |
372 | encode(entries, bl); | |
373 | ENCODE_FINISH(bl); | |
374 | } | |
375 | ||
376 | void decode(bufferlist::const_iterator& bl) { | |
377 | DECODE_START(1, bl); | |
378 | decode(entries, bl); | |
379 | DECODE_FINISH(bl); | |
380 | } | |
381 | ||
382 | bool add_entry(const rgw_bucket& info_source, | |
383 | const obj_version& info_source_ver, | |
384 | const rgw_bucket& bucket) { | |
385 | auto& entry = entries[bucket]; | |
386 | ||
387 | if (!entry.add(info_source, info_source_ver)) { | |
388 | return false; | |
389 | } | |
390 | ||
391 | entry.bucket = bucket; | |
392 | ||
393 | return true; | |
394 | } | |
395 | ||
396 | bool remove_entry(const rgw_bucket& info_source, | |
397 | const obj_version& info_source_ver, | |
398 | const rgw_bucket& bucket) { | |
399 | auto iter = entries.find(bucket); | |
400 | if (iter == entries.end()) { | |
401 | return false; | |
402 | } | |
403 | ||
404 | if (!iter->second.remove(info_source, info_source_ver)) { | |
405 | return false; | |
406 | } | |
407 | ||
408 | if (iter->second.empty()) { | |
409 | entries.erase(iter); | |
410 | } | |
411 | ||
412 | return true; | |
413 | } | |
414 | ||
415 | void clear() { | |
416 | entries.clear(); | |
417 | } | |
418 | ||
419 | bool empty() const { | |
420 | return entries.empty(); | |
421 | } | |
422 | ||
423 | void get_entities(std::set<rgw_bucket> *result) const { | |
424 | for (auto& iter : entries) { | |
425 | result->insert(iter.first); | |
426 | } | |
427 | } | |
428 | }; | |
429 | ||
430 | struct info_map { | |
431 | map<rgw_bucket, single_instance_info> instances; | |
432 | ||
433 | void encode(bufferlist& bl) const { | |
434 | ENCODE_START(1, 1, bl); | |
435 | encode(instances, bl); | |
436 | ENCODE_FINISH(bl); | |
437 | } | |
438 | ||
439 | void decode(bufferlist::const_iterator& bl) { | |
440 | DECODE_START(1, bl); | |
441 | decode(instances, bl); | |
442 | DECODE_FINISH(bl); | |
443 | } | |
444 | ||
445 | bool empty() const { | |
446 | return instances.empty(); | |
447 | } | |
448 | ||
449 | void clear() { | |
450 | instances.clear(); | |
451 | } | |
452 | ||
453 | void get_entities(const rgw_bucket& bucket, | |
454 | std::set<rgw_bucket> *result) const { | |
455 | auto iter = instances.find(bucket); | |
456 | if (iter == instances.end()) { | |
457 | return; | |
458 | } | |
459 | iter->second.get_entities(result); | |
460 | } | |
461 | } info; | |
462 | ||
463 | RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc, | |
464 | const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()), | |
465 | obj_ctx(_sysobj_svc->init_obj_ctx()), | |
466 | obj(_obj), | |
467 | sysobj(obj_ctx.get_obj(obj)) | |
468 | { | |
469 | svc.sysobj = _sysobj_svc; | |
470 | } | |
471 | ||
472 | template <typename C1, typename C2> | |
473 | int update(const rgw_bucket& entity, | |
474 | const RGWBucketInfo& info_source, | |
475 | C1 *add, | |
476 | C2 *remove, | |
477 | optional_yield y); | |
478 | ||
479 | private: | |
480 | template <typename C1, typename C2> | |
481 | void update_entries(const rgw_bucket& info_source, | |
482 | const obj_version& info_source_ver, | |
483 | C1 *add, | |
484 | C2 *remove, | |
485 | single_instance_info *instance); | |
486 | ||
487 | int read(optional_yield y); | |
488 | int flush(optional_yield y); | |
489 | ||
490 | void invalidate() { | |
491 | has_data = false; | |
492 | info.clear(); | |
493 | } | |
494 | ||
495 | void get_entities(const rgw_bucket& bucket, | |
496 | std::set<rgw_bucket> *result) const { | |
497 | info.get_entities(bucket, result); | |
498 | } | |
499 | }; | |
500 | WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry) | |
501 | WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info) | |
502 | WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map) | |
503 | ||
504 | template <typename C1, typename C2> | |
505 | int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity, | |
506 | const RGWBucketInfo& info_source, | |
507 | C1 *add, | |
508 | C2 *remove, | |
509 | optional_yield y) | |
510 | { | |
511 | int r = 0; | |
512 | ||
513 | auto& info_source_ver = info_source.objv_tracker.read_version; | |
514 | ||
515 | #define MAX_RETRIES 25 | |
516 | ||
517 | for (int i = 0; i < MAX_RETRIES; ++i) { | |
518 | if (!has_data) { | |
519 | r = read(y); | |
520 | if (r < 0) { | |
521 | ldout(cct, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl; | |
522 | return r; | |
523 | } | |
524 | } | |
525 | ||
526 | auto& instance = info.instances[entity]; | |
527 | ||
528 | update_entries(info_source.bucket, | |
529 | info_source_ver, | |
530 | add, remove, | |
531 | &instance); | |
532 | ||
533 | if (instance.empty()) { | |
534 | info.instances.erase(entity); | |
535 | } | |
536 | ||
537 | r = flush(y); | |
538 | if (r >= 0) { | |
539 | return 0; | |
540 | } | |
541 | ||
542 | if (r != -ECANCELED) { | |
543 | ldout(cct, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl; | |
544 | return r; | |
545 | } | |
546 | ||
547 | invalidate(); | |
548 | } | |
549 | ldout(cct, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl; | |
550 | ||
551 | return -EIO; | |
552 | } | |
553 | ||
554 | template <typename C1, typename C2> | |
555 | void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source, | |
556 | const obj_version& info_source_ver, | |
557 | C1 *add, | |
558 | C2 *remove, | |
559 | single_instance_info *instance) | |
560 | { | |
561 | if (remove) { | |
562 | for (auto& bucket : *remove) { | |
563 | instance->remove_entry(info_source, info_source_ver, bucket); | |
564 | } | |
565 | } | |
566 | ||
567 | if (add) { | |
568 | for (auto& bucket : *add) { | |
569 | instance->add_entry(info_source, info_source_ver, bucket); | |
570 | } | |
571 | } | |
572 | } | |
573 | ||
574 | int RGWSI_BS_SObj_HintIndexObj::read(optional_yield y) { | |
575 | RGWObjVersionTracker _ot; | |
576 | bufferlist bl; | |
577 | int r = sysobj.rop() | |
578 | .set_objv_tracker(&_ot) /* forcing read of current version */ | |
579 | .read(&bl, y); | |
580 | if (r < 0 && r != -ENOENT) { | |
581 | ldout(cct, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl; | |
582 | return r; | |
583 | } | |
584 | ||
585 | ot = _ot; | |
586 | ||
587 | if (r >= 0) { | |
588 | auto iter = bl.cbegin(); | |
589 | try { | |
590 | decode(info, iter); | |
591 | has_data = true; | |
592 | } catch (buffer::error& err) { | |
593 | ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl; | |
594 | info.clear(); | |
595 | } | |
596 | } else { | |
597 | info.clear(); | |
598 | } | |
599 | ||
600 | return 0; | |
601 | } | |
602 | ||
603 | int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) { | |
604 | int r; | |
605 | ||
606 | if (!info.empty()) { | |
607 | bufferlist bl; | |
608 | encode(info, bl); | |
609 | ||
610 | r = sysobj.wop() | |
611 | .set_objv_tracker(&ot) /* forcing read of current version */ | |
612 | .write(bl, y); | |
613 | ||
614 | } else { /* remove */ | |
615 | r = sysobj.wop() | |
616 | .set_objv_tracker(&ot) | |
617 | .remove(y); | |
618 | } | |
619 | ||
620 | if (r < 0) { | |
621 | return r; | |
622 | } | |
623 | ||
624 | return 0; | |
625 | } | |
626 | ||
627 | rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const | |
628 | { | |
629 | rgw_bucket b = bucket; | |
630 | b.bucket_id.clear(); | |
631 | return rgw_raw_obj(svc.zone->get_zone_params().log_pool, | |
632 | bucket_sync_sources_oid_prefix + "." + b.get_key()); | |
633 | } | |
634 | ||
635 | rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const | |
636 | { | |
637 | rgw_bucket b = bucket; | |
638 | b.bucket_id.clear(); | |
639 | return rgw_raw_obj(svc.zone->get_zone_params().log_pool, | |
640 | bucket_sync_targets_oid_prefix + "." + b.get_key()); | |
641 | } | |
642 | ||
643 | template <typename C1, typename C2> | |
644 | int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const RGWBucketInfo& bucket_info, | |
645 | C1& added_dests, | |
646 | C2& removed_dests, | |
647 | C1& added_sources, | |
648 | C2& removed_sources, | |
649 | optional_yield y) | |
650 | { | |
651 | C1 self_entity = { bucket_info.bucket }; | |
652 | ||
653 | if (!added_dests.empty() || | |
654 | !removed_dests.empty()) { | |
655 | /* update our dests */ | |
656 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
657 | get_dests_obj(bucket_info.bucket)); | |
658 | int r = index.update(bucket_info.bucket, | |
659 | bucket_info, | |
660 | &added_dests, | |
661 | &removed_dests, | |
662 | y); | |
663 | if (r < 0) { | |
664 | ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; | |
665 | return r; | |
666 | } | |
667 | ||
668 | /* update dest buckets */ | |
669 | for (auto& dest_bucket : added_dests) { | |
670 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
671 | get_sources_obj(dest_bucket)); | |
672 | int r = dep_index.update(dest_bucket, | |
673 | bucket_info, | |
674 | &self_entity, | |
675 | static_cast<C2 *>(nullptr), | |
676 | y); | |
677 | if (r < 0) { | |
678 | ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; | |
679 | return r; | |
680 | } | |
681 | } | |
682 | /* update removed dest buckets */ | |
683 | for (auto& dest_bucket : removed_dests) { | |
684 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
685 | get_sources_obj(dest_bucket)); | |
686 | int r = dep_index.update(dest_bucket, | |
687 | bucket_info, | |
688 | static_cast<C1 *>(nullptr), | |
689 | &self_entity, | |
690 | y); | |
691 | if (r < 0) { | |
692 | ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; | |
693 | return r; | |
694 | } | |
695 | } | |
696 | } | |
697 | ||
698 | if (!added_sources.empty() || | |
699 | !removed_sources.empty()) { | |
700 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
701 | get_sources_obj(bucket_info.bucket)); | |
702 | /* update our sources */ | |
703 | int r = index.update(bucket_info.bucket, | |
704 | bucket_info, | |
705 | &added_sources, | |
706 | &removed_sources, | |
707 | y); | |
708 | if (r < 0) { | |
709 | ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; | |
710 | return r; | |
711 | } | |
712 | ||
713 | /* update added sources buckets */ | |
714 | for (auto& source_bucket : added_sources) { | |
715 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
716 | get_dests_obj(source_bucket)); | |
717 | int r = dep_index.update(source_bucket, | |
718 | bucket_info, | |
719 | &self_entity, | |
720 | static_cast<C2 *>(nullptr), | |
721 | y); | |
722 | if (r < 0) { | |
723 | ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; | |
724 | return r; | |
725 | } | |
726 | } | |
727 | /* update removed dest buckets */ | |
728 | for (auto& source_bucket : removed_sources) { | |
729 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
730 | get_dests_obj(source_bucket)); | |
731 | int r = dep_index.update(source_bucket, | |
732 | bucket_info, | |
733 | static_cast<C1 *>(nullptr), | |
734 | &self_entity, | |
735 | y); | |
736 | if (r < 0) { | |
737 | ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; | |
738 | return r; | |
739 | } | |
740 | } | |
741 | } | |
742 | ||
743 | return 0; | |
744 | } | |
745 | ||
746 | int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info, | |
747 | optional_yield y) | |
748 | { | |
749 | std::set<rgw_bucket> sources_set; | |
750 | std::set<rgw_bucket> dests_set; | |
751 | ||
752 | if (bucket_info.sync_policy) { | |
753 | bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket, | |
754 | &sources_set, | |
755 | &dests_set); | |
756 | } | |
757 | ||
758 | std::vector<rgw_bucket> removed_sources; | |
759 | removed_sources.reserve(sources_set.size()); | |
760 | for (auto& e : sources_set) { | |
761 | removed_sources.push_back(e); | |
762 | } | |
763 | ||
764 | std::vector<rgw_bucket> removed_dests; | |
765 | removed_dests.reserve(dests_set.size()); | |
766 | for (auto& e : dests_set) { | |
767 | removed_dests.push_back(e); | |
768 | } | |
769 | ||
770 | std::vector<rgw_bucket> added_sources; | |
771 | std::vector<rgw_bucket> added_dests; | |
772 | ||
773 | return hint_index_mgr->update_hints(bucket_info, | |
774 | added_dests, | |
775 | removed_dests, | |
776 | added_sources, | |
777 | removed_sources, | |
778 | y); | |
779 | } | |
780 | ||
781 | int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info, | |
782 | RGWBucketInfo *orig_bucket_info, | |
783 | optional_yield y) | |
784 | { | |
785 | std::set<rgw_bucket> orig_sources; | |
786 | std::set<rgw_bucket> orig_dests; | |
787 | ||
788 | if (orig_bucket_info && | |
789 | orig_bucket_info->sync_policy) { | |
790 | orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket, | |
791 | &orig_sources, | |
792 | &orig_dests); | |
793 | } | |
794 | ||
795 | std::set<rgw_bucket> sources; | |
796 | std::set<rgw_bucket> dests; | |
797 | if (bucket_info.sync_policy) { | |
798 | bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket, | |
799 | &sources, | |
800 | &dests); | |
801 | } | |
802 | ||
803 | std::vector<rgw_bucket> removed_sources; | |
804 | std::vector<rgw_bucket> added_sources; | |
805 | bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources); | |
806 | ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl; | |
807 | ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl; | |
808 | ||
809 | std::vector<rgw_bucket> removed_dests; | |
810 | std::vector<rgw_bucket> added_dests; | |
811 | found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests); | |
812 | ||
813 | ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl; | |
814 | ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl; | |
815 | ||
816 | if (!found) { | |
817 | return 0; | |
818 | } | |
819 | ||
820 | return hint_index_mgr->update_hints(bucket_info, | |
821 | dests, /* set all dests, not just the ones that were added */ | |
822 | removed_dests, | |
823 | sources, /* set all sources, not just that the ones that were added */ | |
824 | removed_sources, | |
825 | y); | |
826 | } | |
827 | ||
828 | int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket, | |
829 | std::set<rgw_bucket> *sources, | |
830 | std::set<rgw_bucket> *dests, | |
831 | optional_yield y) | |
832 | { | |
833 | if (!sources && !dests) { | |
834 | return 0; | |
835 | } | |
836 | ||
837 | if (sources) { | |
838 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
839 | hint_index_mgr->get_sources_obj(bucket)); | |
840 | int r = index.read(y); | |
841 | if (r < 0) { | |
842 | ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl; | |
843 | return r; | |
844 | } | |
845 | ||
846 | index.get_entities(bucket, sources); | |
847 | ||
848 | if (!bucket.bucket_id.empty()) { | |
849 | rgw_bucket b = bucket; | |
850 | b.bucket_id.clear(); | |
851 | index.get_entities(b, sources); | |
852 | } | |
853 | } | |
854 | ||
855 | if (dests) { | |
856 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
857 | hint_index_mgr->get_dests_obj(bucket)); | |
858 | int r = index.read(y); | |
859 | if (r < 0) { | |
860 | ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl; | |
861 | return r; | |
862 | } | |
863 | ||
864 | index.get_entities(bucket, dests); | |
865 | ||
866 | if (!bucket.bucket_id.empty()) { | |
867 | rgw_bucket b = bucket; | |
868 | b.bucket_id.clear(); | |
869 | index.get_entities(b, dests); | |
870 | } | |
871 | } | |
872 | ||
873 | return 0; | |
874 | } |