]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | #include "svc_bucket_sync_sobj.h" |
2 | #include "svc_zone.h" | |
3 | #include "svc_sys_obj_cache.h" | |
4 | #include "svc_bucket_sobj.h" | |
5 | ||
1e59de90 TL |
6 | #include "rgw_bucket_sync.h" |
7 | #include "rgw_zone.h" | |
8 | #include "rgw_sync_policy.h" | |
9f95a23c TL |
9 | |
10 | #define dout_subsys ceph_subsys_rgw | |
11 | ||
20effc67 TL |
12 | using namespace std; |
13 | ||
9f95a23c TL |
14 | static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints"; |
15 | static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints"; | |
16 | ||
17 | class RGWSI_Bucket_Sync_SObj_HintIndexManager { | |
18 | CephContext *cct; | |
19 | ||
20 | struct { | |
21 | RGWSI_Zone *zone; | |
22 | RGWSI_SysObj *sysobj; | |
23 | } svc; | |
24 | ||
25 | public: | |
26 | RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone *_zone_svc, | |
27 | RGWSI_SysObj *_sysobj_svc) { | |
28 | svc.zone = _zone_svc; | |
29 | svc.sysobj = _sysobj_svc; | |
30 | ||
31 | cct = svc.zone->ctx(); | |
32 | } | |
33 | ||
34 | rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const; | |
35 | rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const; | |
36 | ||
37 | template <typename C1, typename C2> | |
b3b6e05e TL |
38 | int update_hints(const DoutPrefixProvider *dpp, |
39 | const RGWBucketInfo& bucket_info, | |
9f95a23c TL |
40 | C1& added_dests, |
41 | C2& removed_dests, | |
42 | C1& added_sources, | |
43 | C2& removed_sources, | |
44 | optional_yield y); | |
45 | }; | |
46 | ||
47 | RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) { | |
48 | } | |
49 | RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() { | |
50 | } | |
51 | ||
52 | void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc, | |
53 | RGWSI_SysObj *_sysobj_svc, | |
54 | RGWSI_SysObj_Cache *_cache_svc, | |
55 | RGWSI_Bucket_SObj *bucket_sobj_svc) | |
56 | { | |
57 | svc.zone = _zone_svc; | |
58 | svc.sysobj = _sysobj_svc; | |
59 | svc.cache = _cache_svc; | |
60 | svc.bucket_sobj = bucket_sobj_svc; | |
61 | ||
62 | hint_index_mgr.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc.zone, svc.sysobj)); | |
63 | } | |
64 | ||
b3b6e05e | 65 | int RGWSI_Bucket_Sync_SObj::do_start(optional_yield, const DoutPrefixProvider *dpp) |
9f95a23c TL |
66 | { |
67 | sync_policy_cache.reset(new RGWChainedCacheImpl<bucket_sync_policy_cache_entry>); | |
68 | sync_policy_cache->init(svc.cache); | |
69 | ||
70 | return 0; | |
71 | } | |
72 | ||
73 | void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx, | |
74 | const std::set<rgw_zone_id>& zones, | |
75 | const std::set<rgw_bucket>& buckets, | |
76 | std::set<rgw_sync_bucket_entity> *hint_entities, | |
b3b6e05e | 77 | optional_yield y, const DoutPrefixProvider *dpp) |
9f95a23c TL |
78 | { |
79 | vector<rgw_bucket> hint_buckets; | |
80 | ||
81 | hint_buckets.reserve(buckets.size()); | |
82 | ||
83 | for (auto& b : buckets) { | |
84 | RGWBucketInfo hint_bucket_info; | |
85 | int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info, | |
86 | nullptr, nullptr, boost::none, | |
b3b6e05e | 87 | y, dpp); |
9f95a23c | 88 | if (ret < 0) { |
b3b6e05e | 89 | ldpp_dout(dpp, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl; |
9f95a23c TL |
90 | continue; |
91 | } | |
92 | ||
93 | hint_buckets.emplace_back(std::move(hint_bucket_info.bucket)); | |
94 | } | |
95 | ||
96 | for (auto& zone : zones) { | |
97 | for (auto& b : hint_buckets) { | |
98 | hint_entities->insert(rgw_sync_bucket_entity(zone, b)); | |
99 | } | |
100 | } | |
101 | } | |
102 | ||
103 | int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx, | |
104 | rgw_sync_bucket_entity& self_entity, | |
105 | RGWBucketSyncPolicyHandlerRef& handler, | |
106 | RGWBucketSyncPolicyHandlerRef& zone_policy_handler, | |
107 | std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map, | |
b3b6e05e TL |
108 | optional_yield y, |
109 | const DoutPrefixProvider *dpp) | |
9f95a23c TL |
110 | { |
111 | set<rgw_zone_id> source_zones; | |
112 | set<rgw_zone_id> target_zones; | |
113 | ||
20effc67 | 114 | zone_policy_handler->reflect(dpp, nullptr, nullptr, |
9f95a23c TL |
115 | nullptr, nullptr, |
116 | &source_zones, | |
117 | &target_zones, | |
118 | false); /* relaxed: also get all zones that we allow to sync to/from */ | |
119 | ||
120 | std::set<rgw_sync_bucket_entity> hint_entities; | |
121 | ||
b3b6e05e TL |
122 | get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y, dpp); |
123 | get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y, dpp); | |
9f95a23c TL |
124 | |
125 | std::set<rgw_sync_bucket_pipe> resolved_sources; | |
126 | std::set<rgw_sync_bucket_pipe> resolved_dests; | |
127 | ||
128 | for (auto& hint_entity : hint_entities) { | |
129 | if (!hint_entity.zone || | |
130 | !hint_entity.bucket) { | |
131 | continue; /* shouldn't really happen */ | |
132 | } | |
133 | ||
134 | auto& zid = *hint_entity.zone; | |
135 | auto& hint_bucket = *hint_entity.bucket; | |
136 | ||
137 | RGWBucketSyncPolicyHandlerRef hint_bucket_handler; | |
138 | ||
139 | auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket)); | |
140 | if (iter != temp_map.end()) { | |
141 | hint_bucket_handler = iter->second; | |
142 | } else { | |
b3b6e05e | 143 | int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y, dpp); |
9f95a23c | 144 | if (r < 0) { |
b3b6e05e | 145 | ldpp_dout(dpp, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl; |
9f95a23c TL |
146 | continue; |
147 | } | |
148 | } | |
149 | ||
150 | hint_bucket_handler->get_pipes(&resolved_dests, | |
151 | &resolved_sources, | |
152 | self_entity); /* flipping resolved dests and sources as these are | |
153 | relative to the remote entity */ | |
154 | } | |
155 | ||
156 | handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests)); | |
157 | ||
158 | return 0; | |
159 | } | |
160 | ||
161 | int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, | |
162 | std::optional<rgw_zone_id> zone, | |
163 | std::optional<rgw_bucket> _bucket, | |
164 | std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map, | |
165 | RGWBucketSyncPolicyHandlerRef *handler, | |
b3b6e05e TL |
166 | optional_yield y, |
167 | const DoutPrefixProvider *dpp) | |
9f95a23c TL |
168 | { |
169 | if (!_bucket) { | |
170 | *handler = svc.zone->get_sync_policy_handler(zone); | |
171 | return 0; | |
172 | } | |
173 | ||
1e59de90 TL |
174 | auto bucket = *_bucket; |
175 | ||
176 | if (bucket.bucket_id.empty()) { | |
177 | RGWBucketEntryPoint ep_info; | |
178 | int ret = svc.bucket_sobj->read_bucket_entrypoint_info(ctx.ep, | |
179 | RGWSI_Bucket::get_entrypoint_meta_key(bucket), | |
180 | &ep_info, | |
181 | nullptr, /* objv_tracker */ | |
182 | nullptr, /* mtime */ | |
183 | nullptr, /* attrs */ | |
184 | y, | |
185 | dpp, | |
186 | nullptr, /* cache_info */ | |
187 | boost::none /* refresh_version */); | |
188 | if (ret < 0) { | |
189 | if (ret != -ENOENT) { | |
190 | ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_info(bucket=" << bucket << ") returned r=" << ret << dendl; | |
191 | } | |
192 | return ret; | |
193 | } | |
194 | ||
195 | bucket = ep_info.bucket; | |
196 | } | |
9f95a23c TL |
197 | |
198 | string zone_key; | |
199 | string bucket_key; | |
200 | ||
201 | if (zone && *zone != svc.zone->zone_id()) { | |
202 | zone_key = zone->id; | |
203 | } | |
204 | ||
205 | bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket); | |
206 | ||
207 | string cache_key("bi/" + zone_key + "/" + bucket_key); | |
208 | ||
209 | if (auto e = sync_policy_cache->find(cache_key)) { | |
210 | *handler = e->handler; | |
211 | return 0; | |
212 | } | |
213 | ||
214 | bucket_sync_policy_cache_entry e; | |
215 | rgw_cache_entry_info cache_info; | |
216 | ||
217 | RGWBucketInfo bucket_info; | |
218 | map<string, bufferlist> attrs; | |
219 | ||
220 | int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi, | |
221 | bucket_key, | |
222 | &bucket_info, | |
223 | nullptr, | |
224 | &attrs, | |
225 | y, | |
b3b6e05e | 226 | dpp, |
9f95a23c TL |
227 | &cache_info); |
228 | if (r < 0) { | |
229 | if (r != -ENOENT) { | |
b3b6e05e | 230 | ldpp_dout(dpp, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl; |
9f95a23c TL |
231 | } |
232 | return r; | |
233 | } | |
234 | ||
235 | auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone); | |
236 | if (!zone_policy_handler) { | |
b3b6e05e | 237 | ldpp_dout(dpp, 20) << "ERROR: could not find policy handler for zone=" << zone << dendl; |
9f95a23c TL |
238 | return -ENOENT; |
239 | } | |
240 | ||
241 | e.handler.reset(zone_policy_handler->alloc_child(bucket_info, std::move(attrs))); | |
242 | ||
b3b6e05e | 243 | r = e.handler->init(dpp, y); |
9f95a23c | 244 | if (r < 0) { |
b3b6e05e | 245 | ldpp_dout(dpp, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl; |
9f95a23c TL |
246 | return r; |
247 | } | |
248 | ||
249 | temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler); | |
250 | ||
251 | rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket); | |
252 | ||
253 | r = resolve_policy_hints(ctx, self_entity, | |
254 | e.handler, | |
255 | zone_policy_handler, | |
b3b6e05e | 256 | temp_map, y, dpp); |
9f95a23c | 257 | if (r < 0) { |
b3b6e05e | 258 | ldpp_dout(dpp, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl; |
9f95a23c TL |
259 | return r; |
260 | } | |
261 | ||
b3b6e05e TL |
262 | if (!sync_policy_cache->put(dpp, svc.cache, cache_key, &e, {&cache_info})) { |
263 | ldpp_dout(dpp, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; | |
9f95a23c TL |
264 | } |
265 | ||
266 | *handler = e.handler; | |
267 | ||
268 | return 0; | |
269 | } | |
270 | ||
271 | int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, | |
272 | std::optional<rgw_zone_id> zone, | |
273 | std::optional<rgw_bucket> _bucket, | |
274 | RGWBucketSyncPolicyHandlerRef *handler, | |
b3b6e05e TL |
275 | optional_yield y, |
276 | const DoutPrefixProvider *dpp) | |
9f95a23c TL |
277 | { |
278 | std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef> temp_map; | |
b3b6e05e | 279 | return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y, dpp); |
9f95a23c TL |
280 | } |
281 | ||
282 | static bool diff_sets(std::set<rgw_bucket>& orig_set, | |
283 | std::set<rgw_bucket>& new_set, | |
284 | vector<rgw_bucket> *added, | |
285 | vector<rgw_bucket> *removed) | |
286 | { | |
287 | auto oiter = orig_set.begin(); | |
288 | auto niter = new_set.begin(); | |
289 | ||
290 | while (oiter != orig_set.end() && | |
291 | niter != new_set.end()) { | |
292 | if (*oiter == *niter) { | |
293 | ++oiter; | |
294 | ++niter; | |
295 | continue; | |
1e59de90 | 296 | } else if (*oiter < *niter) { |
9f95a23c TL |
297 | removed->push_back(*oiter); |
298 | ++oiter; | |
1e59de90 | 299 | } else { |
9f95a23c TL |
300 | added->push_back(*niter); |
301 | ++niter; | |
302 | } | |
303 | } | |
304 | for (; oiter != orig_set.end(); ++oiter) { | |
305 | removed->push_back(*oiter); | |
306 | } | |
307 | for (; niter != new_set.end(); ++niter) { | |
308 | added->push_back(*niter); | |
309 | } | |
310 | ||
311 | return !(removed->empty() && added->empty()); | |
312 | } | |
313 | ||
314 | ||
315 | class RGWSI_BS_SObj_HintIndexObj | |
316 | { | |
317 | friend class RGWSI_Bucket_Sync_SObj; | |
318 | ||
319 | CephContext *cct; | |
320 | struct { | |
321 | RGWSI_SysObj *sysobj; | |
322 | } svc; | |
323 | ||
9f95a23c TL |
324 | rgw_raw_obj obj; |
325 | RGWSysObj sysobj; | |
326 | ||
327 | RGWObjVersionTracker ot; | |
328 | ||
329 | bool has_data{false}; | |
330 | ||
331 | public: | |
332 | struct bi_entry { | |
333 | rgw_bucket bucket; | |
334 | map<rgw_bucket /* info_source */, obj_version> sources; | |
335 | ||
336 | void encode(bufferlist& bl) const { | |
337 | ENCODE_START(1, 1, bl); | |
338 | encode(bucket, bl); | |
339 | encode(sources, bl); | |
340 | ENCODE_FINISH(bl); | |
341 | } | |
342 | ||
343 | void decode(bufferlist::const_iterator& bl) { | |
344 | DECODE_START(1, bl); | |
345 | decode(bucket, bl); | |
346 | decode(sources, bl); | |
347 | DECODE_FINISH(bl); | |
348 | } | |
349 | ||
350 | bool add(const rgw_bucket& info_source, | |
351 | const obj_version& info_source_ver) { | |
352 | auto& ver = sources[info_source]; | |
353 | ||
354 | if (ver == info_source_ver) { /* already updated */ | |
355 | return false; | |
356 | } | |
357 | ||
358 | if (info_source_ver.tag == ver.tag && | |
359 | info_source_ver.ver < ver.ver) { | |
360 | return false; | |
361 | } | |
362 | ||
363 | ver = info_source_ver; | |
364 | ||
365 | return true; | |
366 | } | |
367 | ||
368 | bool remove(const rgw_bucket& info_source, | |
369 | const obj_version& info_source_ver) { | |
370 | auto iter = sources.find(info_source); | |
371 | if (iter == sources.end()) { | |
372 | return false; | |
373 | } | |
374 | ||
375 | auto& ver = iter->second; | |
376 | ||
377 | if (info_source_ver.tag == ver.tag && | |
378 | info_source_ver.ver < ver.ver) { | |
379 | return false; | |
380 | } | |
381 | ||
382 | sources.erase(info_source); | |
383 | return true; | |
384 | } | |
385 | ||
386 | bool empty() const { | |
387 | return sources.empty(); | |
388 | } | |
389 | }; | |
390 | ||
391 | struct single_instance_info { | |
392 | map<rgw_bucket, bi_entry> entries; | |
393 | ||
394 | void encode(bufferlist& bl) const { | |
395 | ENCODE_START(1, 1, bl); | |
396 | encode(entries, bl); | |
397 | ENCODE_FINISH(bl); | |
398 | } | |
399 | ||
400 | void decode(bufferlist::const_iterator& bl) { | |
401 | DECODE_START(1, bl); | |
402 | decode(entries, bl); | |
403 | DECODE_FINISH(bl); | |
404 | } | |
405 | ||
406 | bool add_entry(const rgw_bucket& info_source, | |
407 | const obj_version& info_source_ver, | |
408 | const rgw_bucket& bucket) { | |
409 | auto& entry = entries[bucket]; | |
410 | ||
411 | if (!entry.add(info_source, info_source_ver)) { | |
412 | return false; | |
413 | } | |
414 | ||
415 | entry.bucket = bucket; | |
416 | ||
417 | return true; | |
418 | } | |
419 | ||
420 | bool remove_entry(const rgw_bucket& info_source, | |
421 | const obj_version& info_source_ver, | |
422 | const rgw_bucket& bucket) { | |
423 | auto iter = entries.find(bucket); | |
424 | if (iter == entries.end()) { | |
425 | return false; | |
426 | } | |
427 | ||
428 | if (!iter->second.remove(info_source, info_source_ver)) { | |
429 | return false; | |
430 | } | |
431 | ||
432 | if (iter->second.empty()) { | |
433 | entries.erase(iter); | |
434 | } | |
435 | ||
436 | return true; | |
437 | } | |
438 | ||
439 | void clear() { | |
440 | entries.clear(); | |
441 | } | |
442 | ||
443 | bool empty() const { | |
444 | return entries.empty(); | |
445 | } | |
446 | ||
447 | void get_entities(std::set<rgw_bucket> *result) const { | |
448 | for (auto& iter : entries) { | |
449 | result->insert(iter.first); | |
450 | } | |
451 | } | |
452 | }; | |
453 | ||
454 | struct info_map { | |
455 | map<rgw_bucket, single_instance_info> instances; | |
456 | ||
457 | void encode(bufferlist& bl) const { | |
458 | ENCODE_START(1, 1, bl); | |
459 | encode(instances, bl); | |
460 | ENCODE_FINISH(bl); | |
461 | } | |
462 | ||
463 | void decode(bufferlist::const_iterator& bl) { | |
464 | DECODE_START(1, bl); | |
465 | decode(instances, bl); | |
466 | DECODE_FINISH(bl); | |
467 | } | |
468 | ||
469 | bool empty() const { | |
470 | return instances.empty(); | |
471 | } | |
472 | ||
473 | void clear() { | |
474 | instances.clear(); | |
475 | } | |
476 | ||
477 | void get_entities(const rgw_bucket& bucket, | |
478 | std::set<rgw_bucket> *result) const { | |
479 | auto iter = instances.find(bucket); | |
480 | if (iter == instances.end()) { | |
481 | return; | |
482 | } | |
483 | iter->second.get_entities(result); | |
484 | } | |
485 | } info; | |
486 | ||
487 | RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc, | |
488 | const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()), | |
9f95a23c | 489 | obj(_obj), |
1e59de90 | 490 | sysobj(_sysobj_svc->get_obj(obj)) |
9f95a23c TL |
491 | { |
492 | svc.sysobj = _sysobj_svc; | |
493 | } | |
494 | ||
495 | template <typename C1, typename C2> | |
b3b6e05e TL |
496 | int update(const DoutPrefixProvider *dpp, |
497 | const rgw_bucket& entity, | |
9f95a23c TL |
498 | const RGWBucketInfo& info_source, |
499 | C1 *add, | |
500 | C2 *remove, | |
501 | optional_yield y); | |
502 | ||
503 | private: | |
504 | template <typename C1, typename C2> | |
505 | void update_entries(const rgw_bucket& info_source, | |
506 | const obj_version& info_source_ver, | |
507 | C1 *add, | |
508 | C2 *remove, | |
509 | single_instance_info *instance); | |
510 | ||
b3b6e05e TL |
511 | int read(const DoutPrefixProvider *dpp, optional_yield y); |
512 | int flush(const DoutPrefixProvider *dpp, optional_yield y); | |
9f95a23c TL |
513 | |
514 | void invalidate() { | |
515 | has_data = false; | |
516 | info.clear(); | |
517 | } | |
518 | ||
519 | void get_entities(const rgw_bucket& bucket, | |
520 | std::set<rgw_bucket> *result) const { | |
521 | info.get_entities(bucket, result); | |
522 | } | |
523 | }; | |
524 | WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry) | |
525 | WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info) | |
526 | WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map) | |
527 | ||
528 | template <typename C1, typename C2> | |
b3b6e05e TL |
529 | int RGWSI_BS_SObj_HintIndexObj::update(const DoutPrefixProvider *dpp, |
530 | const rgw_bucket& entity, | |
9f95a23c TL |
531 | const RGWBucketInfo& info_source, |
532 | C1 *add, | |
533 | C2 *remove, | |
534 | optional_yield y) | |
535 | { | |
536 | int r = 0; | |
537 | ||
538 | auto& info_source_ver = info_source.objv_tracker.read_version; | |
539 | ||
540 | #define MAX_RETRIES 25 | |
541 | ||
542 | for (int i = 0; i < MAX_RETRIES; ++i) { | |
543 | if (!has_data) { | |
b3b6e05e | 544 | r = read(dpp, y); |
9f95a23c | 545 | if (r < 0) { |
b3b6e05e | 546 | ldpp_dout(dpp, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl; |
9f95a23c TL |
547 | return r; |
548 | } | |
549 | } | |
550 | ||
551 | auto& instance = info.instances[entity]; | |
552 | ||
553 | update_entries(info_source.bucket, | |
554 | info_source_ver, | |
555 | add, remove, | |
556 | &instance); | |
557 | ||
558 | if (instance.empty()) { | |
559 | info.instances.erase(entity); | |
560 | } | |
561 | ||
b3b6e05e | 562 | r = flush(dpp, y); |
9f95a23c TL |
563 | if (r >= 0) { |
564 | return 0; | |
565 | } | |
566 | ||
567 | if (r != -ECANCELED) { | |
b3b6e05e | 568 | ldpp_dout(dpp, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl; |
9f95a23c TL |
569 | return r; |
570 | } | |
571 | ||
572 | invalidate(); | |
573 | } | |
b3b6e05e | 574 | ldpp_dout(dpp, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl; |
9f95a23c TL |
575 | |
576 | return -EIO; | |
577 | } | |
578 | ||
579 | template <typename C1, typename C2> | |
580 | void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source, | |
581 | const obj_version& info_source_ver, | |
582 | C1 *add, | |
583 | C2 *remove, | |
584 | single_instance_info *instance) | |
585 | { | |
586 | if (remove) { | |
587 | for (auto& bucket : *remove) { | |
588 | instance->remove_entry(info_source, info_source_ver, bucket); | |
589 | } | |
590 | } | |
591 | ||
592 | if (add) { | |
593 | for (auto& bucket : *add) { | |
594 | instance->add_entry(info_source, info_source_ver, bucket); | |
595 | } | |
596 | } | |
597 | } | |
598 | ||
b3b6e05e | 599 | int RGWSI_BS_SObj_HintIndexObj::read(const DoutPrefixProvider *dpp, optional_yield y) { |
9f95a23c TL |
600 | RGWObjVersionTracker _ot; |
601 | bufferlist bl; | |
602 | int r = sysobj.rop() | |
603 | .set_objv_tracker(&_ot) /* forcing read of current version */ | |
b3b6e05e | 604 | .read(dpp, &bl, y); |
9f95a23c | 605 | if (r < 0 && r != -ENOENT) { |
b3b6e05e | 606 | ldpp_dout(dpp, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl; |
9f95a23c TL |
607 | return r; |
608 | } | |
609 | ||
610 | ot = _ot; | |
611 | ||
612 | if (r >= 0) { | |
613 | auto iter = bl.cbegin(); | |
614 | try { | |
615 | decode(info, iter); | |
616 | has_data = true; | |
617 | } catch (buffer::error& err) { | |
b3b6e05e | 618 | ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl; |
9f95a23c TL |
619 | info.clear(); |
620 | } | |
621 | } else { | |
622 | info.clear(); | |
623 | } | |
624 | ||
625 | return 0; | |
626 | } | |
627 | ||
b3b6e05e | 628 | int RGWSI_BS_SObj_HintIndexObj::flush(const DoutPrefixProvider *dpp, optional_yield y) { |
9f95a23c TL |
629 | int r; |
630 | ||
631 | if (!info.empty()) { | |
632 | bufferlist bl; | |
633 | encode(info, bl); | |
634 | ||
635 | r = sysobj.wop() | |
636 | .set_objv_tracker(&ot) /* forcing read of current version */ | |
b3b6e05e | 637 | .write(dpp, bl, y); |
9f95a23c TL |
638 | |
639 | } else { /* remove */ | |
640 | r = sysobj.wop() | |
641 | .set_objv_tracker(&ot) | |
b3b6e05e | 642 | .remove(dpp, y); |
9f95a23c TL |
643 | } |
644 | ||
645 | if (r < 0) { | |
646 | return r; | |
647 | } | |
648 | ||
649 | return 0; | |
650 | } | |
651 | ||
652 | rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const | |
653 | { | |
654 | rgw_bucket b = bucket; | |
655 | b.bucket_id.clear(); | |
656 | return rgw_raw_obj(svc.zone->get_zone_params().log_pool, | |
657 | bucket_sync_sources_oid_prefix + "." + b.get_key()); | |
658 | } | |
659 | ||
660 | rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const | |
661 | { | |
662 | rgw_bucket b = bucket; | |
663 | b.bucket_id.clear(); | |
664 | return rgw_raw_obj(svc.zone->get_zone_params().log_pool, | |
665 | bucket_sync_targets_oid_prefix + "." + b.get_key()); | |
666 | } | |
667 | ||
668 | template <typename C1, typename C2> | |
b3b6e05e TL |
669 | int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const DoutPrefixProvider *dpp, |
670 | const RGWBucketInfo& bucket_info, | |
9f95a23c TL |
671 | C1& added_dests, |
672 | C2& removed_dests, | |
673 | C1& added_sources, | |
674 | C2& removed_sources, | |
675 | optional_yield y) | |
676 | { | |
677 | C1 self_entity = { bucket_info.bucket }; | |
678 | ||
679 | if (!added_dests.empty() || | |
680 | !removed_dests.empty()) { | |
681 | /* update our dests */ | |
682 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
683 | get_dests_obj(bucket_info.bucket)); | |
b3b6e05e | 684 | int r = index.update(dpp, bucket_info.bucket, |
9f95a23c TL |
685 | bucket_info, |
686 | &added_dests, | |
687 | &removed_dests, | |
688 | y); | |
689 | if (r < 0) { | |
b3b6e05e | 690 | ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; |
9f95a23c TL |
691 | return r; |
692 | } | |
693 | ||
694 | /* update dest buckets */ | |
695 | for (auto& dest_bucket : added_dests) { | |
696 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
697 | get_sources_obj(dest_bucket)); | |
b3b6e05e | 698 | int r = dep_index.update(dpp, dest_bucket, |
9f95a23c TL |
699 | bucket_info, |
700 | &self_entity, | |
701 | static_cast<C2 *>(nullptr), | |
702 | y); | |
703 | if (r < 0) { | |
b3b6e05e | 704 | ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; |
9f95a23c TL |
705 | return r; |
706 | } | |
707 | } | |
708 | /* update removed dest buckets */ | |
709 | for (auto& dest_bucket : removed_dests) { | |
710 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
711 | get_sources_obj(dest_bucket)); | |
b3b6e05e | 712 | int r = dep_index.update(dpp, dest_bucket, |
9f95a23c TL |
713 | bucket_info, |
714 | static_cast<C1 *>(nullptr), | |
715 | &self_entity, | |
716 | y); | |
717 | if (r < 0) { | |
b3b6e05e | 718 | ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; |
9f95a23c TL |
719 | return r; |
720 | } | |
721 | } | |
722 | } | |
723 | ||
724 | if (!added_sources.empty() || | |
725 | !removed_sources.empty()) { | |
726 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
727 | get_sources_obj(bucket_info.bucket)); | |
728 | /* update our sources */ | |
b3b6e05e | 729 | int r = index.update(dpp, bucket_info.bucket, |
9f95a23c TL |
730 | bucket_info, |
731 | &added_sources, | |
732 | &removed_sources, | |
733 | y); | |
734 | if (r < 0) { | |
b3b6e05e | 735 | ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; |
9f95a23c TL |
736 | return r; |
737 | } | |
738 | ||
739 | /* update added sources buckets */ | |
740 | for (auto& source_bucket : added_sources) { | |
741 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
742 | get_dests_obj(source_bucket)); | |
b3b6e05e | 743 | int r = dep_index.update(dpp, source_bucket, |
9f95a23c TL |
744 | bucket_info, |
745 | &self_entity, | |
746 | static_cast<C2 *>(nullptr), | |
747 | y); | |
748 | if (r < 0) { | |
b3b6e05e | 749 | ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; |
9f95a23c TL |
750 | return r; |
751 | } | |
752 | } | |
753 | /* update removed dest buckets */ | |
754 | for (auto& source_bucket : removed_sources) { | |
755 | RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, | |
756 | get_dests_obj(source_bucket)); | |
b3b6e05e | 757 | int r = dep_index.update(dpp, source_bucket, |
9f95a23c TL |
758 | bucket_info, |
759 | static_cast<C1 *>(nullptr), | |
760 | &self_entity, | |
761 | y); | |
762 | if (r < 0) { | |
b3b6e05e | 763 | ldpp_dout(dpp, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; |
9f95a23c TL |
764 | return r; |
765 | } | |
766 | } | |
767 | } | |
768 | ||
769 | return 0; | |
770 | } | |
771 | ||
b3b6e05e TL |
772 | int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const DoutPrefixProvider *dpp, |
773 | const RGWBucketInfo& bucket_info, | |
9f95a23c TL |
774 | optional_yield y) |
775 | { | |
776 | std::set<rgw_bucket> sources_set; | |
777 | std::set<rgw_bucket> dests_set; | |
778 | ||
779 | if (bucket_info.sync_policy) { | |
780 | bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket, | |
781 | &sources_set, | |
782 | &dests_set); | |
783 | } | |
784 | ||
785 | std::vector<rgw_bucket> removed_sources; | |
786 | removed_sources.reserve(sources_set.size()); | |
787 | for (auto& e : sources_set) { | |
788 | removed_sources.push_back(e); | |
789 | } | |
790 | ||
791 | std::vector<rgw_bucket> removed_dests; | |
792 | removed_dests.reserve(dests_set.size()); | |
793 | for (auto& e : dests_set) { | |
794 | removed_dests.push_back(e); | |
795 | } | |
796 | ||
797 | std::vector<rgw_bucket> added_sources; | |
798 | std::vector<rgw_bucket> added_dests; | |
799 | ||
b3b6e05e | 800 | return hint_index_mgr->update_hints(dpp, bucket_info, |
9f95a23c TL |
801 | added_dests, |
802 | removed_dests, | |
803 | added_sources, | |
804 | removed_sources, | |
805 | y); | |
806 | } | |
807 | ||
b3b6e05e TL |
808 | int RGWSI_Bucket_Sync_SObj::handle_bi_update(const DoutPrefixProvider *dpp, |
809 | RGWBucketInfo& bucket_info, | |
9f95a23c TL |
810 | RGWBucketInfo *orig_bucket_info, |
811 | optional_yield y) | |
812 | { | |
813 | std::set<rgw_bucket> orig_sources; | |
814 | std::set<rgw_bucket> orig_dests; | |
815 | ||
816 | if (orig_bucket_info && | |
817 | orig_bucket_info->sync_policy) { | |
818 | orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket, | |
819 | &orig_sources, | |
820 | &orig_dests); | |
821 | } | |
822 | ||
823 | std::set<rgw_bucket> sources; | |
824 | std::set<rgw_bucket> dests; | |
825 | if (bucket_info.sync_policy) { | |
826 | bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket, | |
827 | &sources, | |
828 | &dests); | |
829 | } | |
830 | ||
831 | std::vector<rgw_bucket> removed_sources; | |
832 | std::vector<rgw_bucket> added_sources; | |
833 | bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources); | |
b3b6e05e TL |
834 | ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl; |
835 | ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl; | |
9f95a23c TL |
836 | |
837 | std::vector<rgw_bucket> removed_dests; | |
838 | std::vector<rgw_bucket> added_dests; | |
839 | found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests); | |
840 | ||
b3b6e05e TL |
841 | ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl; |
842 | ldpp_dout(dpp, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl; | |
9f95a23c TL |
843 | |
844 | if (!found) { | |
845 | return 0; | |
846 | } | |
847 | ||
b3b6e05e | 848 | return hint_index_mgr->update_hints(dpp, bucket_info, |
9f95a23c TL |
849 | dests, /* set all dests, not just the ones that were added */ |
850 | removed_dests, | |
851 | sources, /* set all sources, not just that the ones that were added */ | |
852 | removed_sources, | |
853 | y); | |
854 | } | |
855 | ||
b3b6e05e TL |
856 | int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const DoutPrefixProvider *dpp, |
857 | const rgw_bucket& bucket, | |
9f95a23c TL |
858 | std::set<rgw_bucket> *sources, |
859 | std::set<rgw_bucket> *dests, | |
860 | optional_yield y) | |
861 | { | |
862 | if (!sources && !dests) { | |
863 | return 0; | |
864 | } | |
865 | ||
866 | if (sources) { | |
867 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
868 | hint_index_mgr->get_sources_obj(bucket)); | |
b3b6e05e | 869 | int r = index.read(dpp, y); |
9f95a23c | 870 | if (r < 0) { |
b3b6e05e | 871 | ldpp_dout(dpp, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl; |
9f95a23c TL |
872 | return r; |
873 | } | |
874 | ||
875 | index.get_entities(bucket, sources); | |
876 | ||
877 | if (!bucket.bucket_id.empty()) { | |
878 | rgw_bucket b = bucket; | |
879 | b.bucket_id.clear(); | |
880 | index.get_entities(b, sources); | |
881 | } | |
882 | } | |
883 | ||
884 | if (dests) { | |
885 | RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, | |
886 | hint_index_mgr->get_dests_obj(bucket)); | |
b3b6e05e | 887 | int r = index.read(dpp, y); |
9f95a23c | 888 | if (r < 0) { |
b3b6e05e | 889 | ldpp_dout(dpp, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl; |
9f95a23c TL |
890 | return r; |
891 | } | |
892 | ||
893 | index.get_entities(bucket, dests); | |
894 | ||
895 | if (!bucket.bucket_id.empty()) { | |
896 | rgw_bucket b = bucket; | |
897 | b.bucket_id.clear(); | |
898 | index.get_entities(b, dests); | |
899 | } | |
900 | } | |
901 | ||
902 | return 0; | |
903 | } |