]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_bucket_sync.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_bucket_sync.cc
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
9f95a23c
TL
3
4#include "rgw_common.h"
5#include "rgw_bucket_sync.h"
6#include "rgw_data_sync.h"
7#include "rgw_zone.h"
8
9#include "services/svc_zone.h"
10#include "services/svc_bucket_sync.h"
11
12#define dout_subsys ceph_subsys_rgw
13
20effc67 14using namespace std;
9f95a23c
TL
15
16ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
17 os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or(rgw_zone_id()) << ",az=" << (int)e.all_zones << "}";
18 return os;
19}
20
21ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
22 os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
23 return os;
24}
25
26ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
27 os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<rgw_zone_id>()) << "}";
28 return os;
29}
30
31ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
32 os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
33 return os;
34}
35
36static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
37 const rgw_zone_id& source_zone,
38 const rgw_zone_id& dest_zone)
39{
40 std::vector<rgw_sync_bucket_pipe> relevant_pipes;
41 for (auto& p : pipes) {
42 if (p.source.match_zone(source_zone) &&
43 p.dest.match_zone(dest_zone)) {
44 for (auto pipe : p.expand()) {
45 pipe.source.apply_zone(source_zone);
46 pipe.dest.apply_zone(dest_zone);
47 relevant_pipes.push_back(pipe);
48 }
49 }
50 }
51
52 return relevant_pipes;
53}
54
55static bool is_wildcard_bucket(const rgw_bucket& bucket)
56{
57 return bucket.name.empty();
58}
59
60void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const
61{
62 encode_json("zone", zone.id, f);
63 encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f);
64 encode_json("sources", sources, f);
65 encode_json("dests", dests, f);
66}
67
68
69template <typename CB1, typename CB2>
70void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id& source_zone,
71 const rgw_zone_id& dest_zone,
72 const std::vector<rgw_sync_bucket_pipes>& pipes,
73 zb_pipe_map_t *pipe_map,
74 CB1 filter_cb,
75 CB2 call_filter_cb)
76{
77 if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
78 return;
79 }
80 auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
81
82 for (auto& pipe : relevant_pipes) {
83 rgw_sync_bucket_entity zb;
84 if (!call_filter_cb(pipe, &zb)) {
85 continue;
86 }
87 pipe_map->insert(make_pair(zb, pipe));
88 }
89}
90
91template <typename CB>
92void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id& source_zone,
93 const rgw_zone_id& dest_zone,
94 const std::vector<rgw_sync_bucket_pipes>& pipes,
95 CB filter_cb)
96{
97 return try_add_to_pipe_map(source_zone, dest_zone, pipes,
98 &sources,
99 filter_cb,
100 [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) {
101 *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()};
102 return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
103 });
104}
105
106template <typename CB>
107void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id& source_zone,
108 const rgw_zone_id& dest_zone,
109 const std::vector<rgw_sync_bucket_pipes>& pipes,
110 CB filter_cb)
111{
112 return try_add_to_pipe_map(source_zone, dest_zone, pipes,
113 &dests,
114 filter_cb,
115 [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) {
116 *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()};
117 return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
118 });
119}
120
121using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t;
122
123pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m,
124 const rgw_zone_id& zone,
125 std::optional<rgw_bucket> b) const
126{
127 if (!b) {
128 return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()});
129 }
130
131 auto zb = rgw_sync_bucket_entity{zone, *b};
132
133 auto range = m.equal_range(zb);
134 if (range.first == range.second &&
135 !is_wildcard_bucket(*b)) {
136 /* couldn't find the specific bucket, try to find by wildcard */
137 zb.bucket = rgw_bucket();
138 range = m.equal_range(zb);
139 }
140
141 return range;
142}
143
144
145template <typename CB>
20effc67
TL
146void rgw_sync_group_pipe_map::init(const DoutPrefixProvider *dpp,
147 CephContext *cct,
9f95a23c
TL
148 const rgw_zone_id& _zone,
149 std::optional<rgw_bucket> _bucket,
150 const rgw_sync_policy_group& group,
151 rgw_sync_data_flow_group *_default_flow,
152 std::set<rgw_zone_id> *_pall_zones,
153 CB filter_cb) {
154 zone = _zone;
155 bucket = _bucket;
156 default_flow = _default_flow;
157 pall_zones = _pall_zones;
158
159 rgw_sync_bucket_entity zb(zone, bucket);
160
161 status = group.status;
162
163 std::vector<rgw_sync_bucket_pipes> zone_pipes;
164
165 string bucket_key = (bucket ? bucket->get_key() : "*");
166
167 /* only look at pipes that touch the specific zone and bucket */
168 for (auto& pipe : group.pipes) {
169 if (pipe.contains_zone_bucket(zone, bucket)) {
20effc67 170 ldpp_dout(dpp, 20) << __func__ << "(): pipe_map (zone=" << zone << " bucket=" << bucket_key << "): adding potential pipe: " << pipe << dendl;
9f95a23c
TL
171 zone_pipes.push_back(pipe);
172 }
173 }
174
175 const rgw_sync_data_flow_group *pflow;
176
177 if (!group.data_flow.empty()) {
178 pflow = &group.data_flow;
179 } else {
180 if (!default_flow) {
181 return;
182 }
183 pflow = default_flow;
184 }
185
186 auto& flow = *pflow;
187
188 pall_zones->insert(zone);
189
190 /* symmetrical */
191 for (auto& symmetrical_group : flow.symmetrical) {
192 if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
193 for (auto& z : symmetrical_group.zones) {
194 if (z != zone) {
195 pall_zones->insert(z);
196 try_add_source(z, zone, zone_pipes, filter_cb);
197 try_add_dest(zone, z, zone_pipes, filter_cb);
198 }
199 }
200 }
201 }
202
203 /* directional */
204 for (auto& rule : flow.directional) {
205 if (rule.source_zone == zone) {
206 pall_zones->insert(rule.dest_zone);
207 try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
208 } else if (rule.dest_zone == zone) {
209 pall_zones->insert(rule.source_zone);
210 try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
211 }
212 }
213}
214
215/*
216 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
217 */
218vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id& source_zone,
219 std::optional<rgw_bucket> source_bucket,
220 std::optional<rgw_bucket> dest_bucket) const {
221 vector<rgw_sync_bucket_pipe> result;
222
223 auto range = find_pipes(sources, source_zone, source_bucket);
224
225 for (auto iter = range.first; iter != range.second; ++iter) {
226 auto pipe = iter->second;
227 if (pipe.dest.match_bucket(dest_bucket)) {
228 result.push_back(pipe);
229 }
230 }
231 return result;
232}
233
234/*
235 * find all relevant pipes in other zones that pull from a specific
236 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
237 */
238vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket,
239 const rgw_zone_id& dest_zone,
240 std::optional<rgw_bucket> dest_bucket) const {
241 vector<rgw_sync_bucket_pipe> result;
242
243 auto range = find_pipes(dests, dest_zone, dest_bucket);
244
245 for (auto iter = range.first; iter != range.second; ++iter) {
246 auto pipe = iter->second;
247 if (pipe.source.match_bucket(source_bucket)) {
248 result.push_back(pipe);
249 }
250 }
251
252 return result;
253}
254
255/*
256 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
257 */
258vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id& source_zone,
259 std::optional<rgw_bucket> source_bucket,
260 const rgw_zone_id& dest_zone,
261 std::optional<rgw_bucket> dest_bucket) const {
262 if (dest_zone == zone) {
263 return find_source_pipes(source_zone, source_bucket, dest_bucket);
264 }
265
266 if (source_zone == zone) {
267 return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
268 }
269
270 return vector<rgw_sync_bucket_pipe>();
271}
272
273void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
274{
275 pipes.push_back(pipe);
276
277 auto ppipe = &pipes.back();
278 auto prefix = ppipe->params.source.filter.prefix.value_or(string());
279
280 prefix_refs.insert(make_pair(prefix, ppipe));
281
282 for (auto& t : ppipe->params.source.filter.tags) {
283 string tag = t.key + "=" + t.value;
284 auto titer = tag_refs.find(tag);
285 if (titer != tag_refs.end() &&
286 ppipe->params.priority > titer->second->params.priority) {
287 titer->second = ppipe;
288 } else {
289 tag_refs[tag] = ppipe;
290 }
291 }
292}
293
294bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
295 std::optional<rgw_user> *user,
296 std::optional<rgw_user> *acl_translation_owner,
297 std::optional<string> *storage_class,
298 rgw_sync_pipe_params::Mode *mode,
299 bool *need_more_info) const
300{
301 std::optional<string> owner;
302
303 *need_more_info = false;
304
305 if (prefix_refs.empty()) {
306 return false;
307 }
308
309 auto end = prefix_refs.upper_bound(key.name);
310 auto iter = end;
311 if (iter != prefix_refs.begin()) {
312 --iter;
313 }
314 if (iter == prefix_refs.end()) {
315 return false;
316 }
317
318 if (iter != prefix_refs.begin()) {
319 iter = prefix_refs.find(iter->first); /* prefix_refs is multimap, find first element
320 holding that key */
321 }
322
323 std::vector<decltype(iter)> iters;
324
325 std::optional<int> priority;
326
327 for (; iter != end; ++iter) {
328 auto& prefix = iter->first;
329 if (!boost::starts_with(key.name, prefix)) {
330 continue;
331 }
332
333 auto& rule_params = iter->second->params;
334 auto& filter = rule_params.source.filter;
335
336 if (rule_params.priority > priority) {
337 priority = rule_params.priority;
338
339 if (!filter.has_tags()) {
340 iters.clear();
341 }
342 iters.push_back(iter);
343
344 *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then
345 we can't be sure if it would be used.
346 We need to first read the info from the source object */
347 }
348 }
349
350 if (iters.empty()) {
351 return false;
352 }
353
9f95a23c
TL
354 std::optional<rgw_user> _user;
355 std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
356 std::optional<string> _storage_class;
20effc67 357 rgw_sync_pipe_params::Mode _mode{rgw_sync_pipe_params::Mode::MODE_SYSTEM};
9f95a23c 358
20effc67
TL
359 // make sure all params are the same by saving the first one
360 // encountered and comparing all subsequent to it
361 bool first_iter = true;
9f95a23c 362 for (auto& iter : iters) {
20effc67
TL
363 const rgw_sync_pipe_params& rule_params = iter->second->params;
364 if (first_iter) {
9f95a23c
TL
365 _user = rule_params.user;
366 _acl_translation = rule_params.dest.acl_translation;
367 _storage_class = rule_params.dest.storage_class;
368 _mode = rule_params.mode;
20effc67
TL
369 first_iter = false;
370 } else {
371 // note: three of these == operators are comparing std::optional
372 // against std::optional; as one would expect they are equal a)
373 // if both do not contain values or b) if both do and those
374 // contained values are the same
375 const bool conflict =
376 !(_user == rule_params.user &&
377 _acl_translation == rule_params.dest.acl_translation &&
378 _storage_class == rule_params.dest.storage_class &&
379 _mode == rule_params.mode);
380 if (conflict) {
381 *need_more_info = true;
382 return false;
383 }
9f95a23c
TL
384 }
385 }
386
387 *user = _user;
388 if (_acl_translation) {
389 *acl_translation_owner = _acl_translation->owner;
390 }
391 *storage_class = _storage_class;
392 *mode = _mode;
393
394 return true;
395}
396
397bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
398 const RGWObjTags::tag_map_t& tags,
399 rgw_sync_pipe_params *params) const
400{
401 if (prefix_refs.empty()) {
402 return false;
403 }
404
405 auto iter = prefix_refs.upper_bound(key.name);
406 if (iter != prefix_refs.begin()) {
407 --iter;
408 }
409 if (iter == prefix_refs.end()) {
410 return false;
411 }
412
413 auto end = prefix_refs.upper_bound(key.name);
414 auto max = end;
415
416 std::optional<int> priority;
417
418 for (; iter != end; ++iter) {
419 /* NOTE: this is not the most efficient way to do it,
420 * a trie data structure would be better
421 */
422 auto& prefix = iter->first;
423 if (!boost::starts_with(key.name, prefix)) {
424 continue;
425 }
426
427 auto& rule_params = iter->second->params;
428 auto& filter = rule_params.source.filter;
429
430 if (!filter.check_tags(tags)) {
431 continue;
432 }
433
434 if (rule_params.priority > priority) {
435 priority = rule_params.priority;
436 max = iter;
437 }
438 }
439
440 if (max == end) {
441 return false;
442 }
443
444 *params = max->second->params;
445 return true;
446}
447
448/*
449 * return either the current prefix for s, or the next one if s is not within a prefix
450 */
451
452RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const
453{
454 if (prefix_refs.empty()) {
455 return prefix_refs.end();
456 }
457 auto next = prefix_refs.upper_bound(s);
458 auto iter = next;
459 if (iter != prefix_refs.begin()) {
460 --iter;
461 }
462 if (!boost::starts_with(s, iter->first)) {
463 return next;
464 }
465
466 return iter;
467}
468
469void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
470 pipe_map.insert(make_pair(pipe.id, pipe));
471
472 auto& rules_ref = rules[endpoints_pair(pipe)];
473
474 if (!rules_ref) {
475 rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>();
476 }
477
478 rules_ref->insert(pipe);
479
480 pipe_handler h(rules_ref, pipe);
481
482 handlers.insert(h);
483}
484
485void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
486{
487 encode_json("pipes", pipe_map, f);
488}
489
490bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id& source_zone,
491 std::optional<rgw_bucket> source_bucket,
492 const rgw_zone_id& dest_zone,
493 std::optional<rgw_bucket> dest_bucket,
494 bool check_activated) const
495{
496 bool found = false;
497 bool found_activated = false;
498
499 for (auto m : flow_groups) {
500 auto& fm = m.second;
501 auto pipes = fm.find_pipes(source_zone, source_bucket,
502 dest_zone, dest_bucket);
503
504 bool is_found = !pipes.empty();
505
506 if (is_found) {
507 switch (fm.status) {
508 case rgw_sync_policy_group::Status::FORBIDDEN:
509 return false;
510 case rgw_sync_policy_group::Status::ENABLED:
511 found = true;
512 found_activated = true;
513 break;
514 case rgw_sync_policy_group::Status::ALLOWED:
515 found = true;
516 break;
517 default:
518 break; /* unknown -- ignore */
519 }
520 }
521 }
522
523 if (check_activated && found_activated) {
524 return true;
525 }
526
527 return found;
528}
529
20effc67 530void RGWBucketSyncFlowManager::init(const DoutPrefixProvider *dpp, const rgw_sync_policy_info& sync_policy) {
9f95a23c
TL
531 std::optional<rgw_sync_data_flow_group> default_flow;
532 if (parent) {
533 default_flow.emplace();
534 default_flow->init_default(parent->all_zones);
535 }
536
537 for (auto& item : sync_policy.groups) {
538 auto& group = item.second;
539 auto& flow_group_map = flow_groups[group.id];
540
20effc67 541 flow_group_map.init(dpp, cct, zone_id, bucket, group,
9f95a23c
TL
542 (default_flow ? &(*default_flow) : nullptr),
543 &all_zones,
544 [&](const rgw_zone_id& source_zone,
545 std::optional<rgw_bucket> source_bucket,
546 const rgw_zone_id& dest_zone,
547 std::optional<rgw_bucket> dest_bucket) {
548 if (!parent) {
549 return true;
550 }
551 return parent->allowed_data_flow(source_zone,
552 source_bucket,
553 dest_zone,
554 dest_bucket,
555 false); /* just check that it's not disabled */
556 });
557 }
558}
559
20effc67
TL
560void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
561 std::optional<rgw_bucket> effective_bucket,
9f95a23c
TL
562 RGWBucketSyncFlowManager::pipe_set *source_pipes,
563 RGWBucketSyncFlowManager::pipe_set *dest_pipes,
564 bool only_enabled) const
565
566{
567 string effective_bucket_key;
568 if (effective_bucket) {
569 effective_bucket_key = effective_bucket->get_key();
570 }
571 if (parent) {
20effc67 572 parent->reflect(dpp, effective_bucket, source_pipes, dest_pipes, only_enabled);
9f95a23c
TL
573 }
574
575 for (auto& item : flow_groups) {
576 auto& flow_group_map = item.second;
577
578 /* only return enabled groups */
579 if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
580 (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
581 continue;
582 }
583
584 for (auto& entry : flow_group_map.sources) {
585 rgw_sync_bucket_pipe pipe = entry.second;
586 if (!pipe.dest.match_bucket(effective_bucket)) {
587 continue;
588 }
589
590 pipe.source.apply_bucket(effective_bucket);
591 pipe.dest.apply_bucket(effective_bucket);
592
20effc67 593 ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
9f95a23c
TL
594 source_pipes->insert(pipe);
595 }
596
597 for (auto& entry : flow_group_map.dests) {
598 rgw_sync_bucket_pipe pipe = entry.second;
599
600 if (!pipe.source.match_bucket(effective_bucket)) {
601 continue;
602 }
603
604 pipe.source.apply_bucket(effective_bucket);
605 pipe.dest.apply_bucket(effective_bucket);
606
20effc67 607 ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
9f95a23c
TL
608 dest_pipes->insert(pipe);
609 }
610 }
611}
612
613
614RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext *_cct,
615 const rgw_zone_id& _zone_id,
616 std::optional<rgw_bucket> _bucket,
617 const RGWBucketSyncFlowManager *_parent) : cct(_cct),
618 zone_id(_zone_id),
619 bucket(_bucket),
620 parent(_parent) {}
621
622
623void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc,
624 RGWSI_SyncModules *sync_modules_svc,
625 rgw_sync_policy_info *ppolicy)
626{
627 bool found = false;
628
629 rgw_sync_policy_info policy;
630
631 auto& group = policy.groups["default"];
632 auto& zonegroup = zone_svc->get_zonegroup();
633
634 for (const auto& ziter1 : zonegroup.zones) {
635 auto& id1 = ziter1.first;
636 const RGWZone& z1 = ziter1.second;
637
638 for (const auto& ziter2 : zonegroup.zones) {
639 auto& id2 = ziter2.first;
640 const RGWZone& z2 = ziter2.second;
641
642 if (id1 == id2) {
643 continue;
644 }
645
646 if (z1.syncs_from(z2.name)) {
647 found = true;
648 rgw_sync_directional_rule *rule;
649 group.data_flow.find_or_create_directional(id2,
650 id1,
651 &rule);
652 }
653 }
654 }
655
656 if (!found) { /* nothing syncs */
657 return;
658 }
659
660 rgw_sync_bucket_pipes pipes;
661 pipes.id = "all";
662 pipes.source.all_zones = true;
663 pipes.dest.all_zones = true;
664
665 group.pipes.emplace_back(std::move(pipes));
666
667
668 group.status = rgw_sync_policy_group::Status::ENABLED;
669
670 *ppolicy = std::move(policy);
671}
672
673RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
674 RGWSI_SyncModules *sync_modules_svc,
675 RGWSI_Bucket_Sync *_bucket_sync_svc,
676 std::optional<rgw_zone_id> effective_zone) : zone_svc(_zone_svc) ,
677 bucket_sync_svc(_bucket_sync_svc) {
678 zone_id = effective_zone.value_or(zone_svc->zone_id());
679 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
680 zone_id,
681 nullopt,
682 nullptr));
683 sync_policy = zone_svc->get_zonegroup().sync_policy;
684
685 if (sync_policy.empty()) {
686 RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy);
687 legacy_config = true;
688 }
689}
690
691RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
692 const RGWBucketInfo& _bucket_info,
693 map<string, bufferlist>&& _bucket_attrs) : parent(_parent),
694 bucket_info(_bucket_info),
695 bucket_attrs(std::move(_bucket_attrs)) {
696 if (_bucket_info.sync_policy) {
697 sync_policy = *_bucket_info.sync_policy;
698
699 for (auto& entry : sync_policy.groups) {
700 for (auto& pipe : entry.second.pipes) {
701 if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER &&
702 pipe.params.user.empty()) {
703 pipe.params.user = _bucket_info.owner;
704 }
705 }
706 }
707 }
708 legacy_config = parent->legacy_config;
709 bucket = _bucket_info.bucket;
710 zone_svc = parent->zone_svc;
711 bucket_sync_svc = parent->bucket_sync_svc;
712 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
713 parent->zone_id,
714 _bucket_info.bucket,
715 parent->flow_mgr.get()));
716}
717
718RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
719 const rgw_bucket& _bucket,
720 std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) {
721 if (_sync_policy) {
722 sync_policy = *_sync_policy;
723 }
724 legacy_config = parent->legacy_config;
725 bucket = _bucket;
726 zone_svc = parent->zone_svc;
727 bucket_sync_svc = parent->bucket_sync_svc;
728 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
729 parent->zone_id,
730 _bucket,
731 parent->flow_mgr.get()));
732}
733
734RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info,
735 map<string, bufferlist>&& bucket_attrs) const
736{
737 return new RGWBucketSyncPolicyHandler(this, bucket_info, std::move(bucket_attrs));
738}
739
740RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket,
741 std::optional<rgw_sync_policy_info> sync_policy) const
742{
743 return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
744}
745
b3b6e05e 746int RGWBucketSyncPolicyHandler::init(const DoutPrefixProvider *dpp, optional_yield y)
9f95a23c 747{
b3b6e05e 748 int r = bucket_sync_svc->get_bucket_sync_hints(dpp, bucket.value_or(rgw_bucket()),
9f95a23c
TL
749 &source_hints,
750 &target_hints,
751 y);
752 if (r < 0) {
b3b6e05e 753 ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket="
9f95a23c
TL
754 << bucket << " returned r=" << r << dendl;
755 return r;
756 }
757
20effc67 758 flow_mgr->init(dpp, sync_policy);
9f95a23c 759
20effc67 760 reflect(dpp, &source_pipes,
9f95a23c
TL
761 &target_pipes,
762 &sources,
763 &targets,
764 &source_zones,
765 &target_zones,
766 true);
767
768 return 0;
769}
770
20effc67 771void RGWBucketSyncPolicyHandler::reflect(const DoutPrefixProvider *dpp, RGWBucketSyncFlowManager::pipe_set *psource_pipes,
9f95a23c
TL
772 RGWBucketSyncFlowManager::pipe_set *ptarget_pipes,
773 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources,
774 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets,
775 std::set<rgw_zone_id> *psource_zones,
776 std::set<rgw_zone_id> *ptarget_zones,
777 bool only_enabled) const
778{
779 RGWBucketSyncFlowManager::pipe_set _source_pipes;
780 RGWBucketSyncFlowManager::pipe_set _target_pipes;
781 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _sources;
782 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _targets;
783 std::set<rgw_zone_id> _source_zones;
784 std::set<rgw_zone_id> _target_zones;
785
20effc67 786 flow_mgr->reflect(dpp, bucket, &_source_pipes, &_target_pipes, only_enabled);
9f95a23c
TL
787
788 for (auto& entry : _source_pipes.pipe_map) {
789 auto& pipe = entry.second;
790 if (!pipe.source.zone) {
791 continue;
792 }
793 _source_zones.insert(*pipe.source.zone);
794 _sources[*pipe.source.zone].insert(pipe);
795 }
796
797 for (auto& entry : _target_pipes.pipe_map) {
798 auto& pipe = entry.second;
799 if (!pipe.dest.zone) {
800 continue;
801 }
802 _target_zones.insert(*pipe.dest.zone);
803 _targets[*pipe.dest.zone].insert(pipe);
804 }
805
806 if (psource_pipes) {
807 *psource_pipes = std::move(_source_pipes);
808 }
809 if (ptarget_pipes) {
810 *ptarget_pipes = std::move(_target_pipes);
811 }
812 if (psources) {
813 *psources = std::move(_sources);
814 }
815 if (ptargets) {
816 *ptargets = std::move(_targets);
817 }
818 if (psource_zones) {
819 *psource_zones = std::move(_source_zones);
820 }
821 if (ptarget_zones) {
822 *ptarget_zones = std::move(_target_zones);
823 }
824}
825
826multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const
827{
828 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
829
830 for (auto& source_entry : sources) {
831 auto& zone_id = source_entry.first;
832
833 auto& pipes = source_entry.second.pipe_map;
834
835 for (auto& entry : pipes) {
836 auto& pipe = entry.second;
837 m.insert(make_pair(zone_id, pipe));
838 }
839 }
840
841 for (auto& pipe : resolved_sources) {
842 if (!pipe.source.zone) {
843 continue;
844 }
845
846 m.insert(make_pair(*pipe.source.zone, pipe));
847 }
848
849 return m;
850}
851
852multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const
853{
854 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
855
856 for (auto& dest_entry : targets) {
857 auto& zone_id = dest_entry.first;
858
859 auto& pipes = dest_entry.second.pipe_map;
860
861 for (auto& entry : pipes) {
862 auto& pipe = entry.second;
863 m.insert(make_pair(zone_id, pipe));
864 }
865 }
866
867 for (auto& pipe : resolved_dests) {
868 if (!pipe.dest.zone) {
869 continue;
870 }
871
872 m.insert(make_pair(*pipe.dest.zone, pipe));
873 }
874
875 return m;
876}
877
878multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const
879{
880 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
881
882 auto iter = targets.find(zone_id);
883 if (iter != targets.end()) {
884 auto& pipes = iter->second.pipe_map;
885
886 for (auto& entry : pipes) {
887 auto& pipe = entry.second;
888 m.insert(make_pair(zone_id, pipe));
889 }
890 }
891
892 for (auto& pipe : resolved_dests) {
893 if (!pipe.dest.zone ||
894 *pipe.dest.zone != zone_id) {
895 continue;
896 }
897
898 m.insert(make_pair(*pipe.dest.zone, pipe));
899 }
900
901 return m;
902}
903
904void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
905 std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */
906 for (auto& entry : source_pipes.pipe_map) {
907 auto& source_pipe = entry.second;
908 if (!filter_peer ||
909 source_pipe.source.match(*filter_peer)) {
910 _sources->insert(source_pipe);
911 }
912 }
913
914 for (auto& entry : target_pipes.pipe_map) {
915 auto& target_pipe = entry.second;
916 if (!filter_peer ||
917 target_pipe.dest.match(*filter_peer)) {
918 _targets->insert(target_pipe);
919 }
920 }
921}
922
923bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
924{
925 if (!bucket) {
926 return false;
927 }
928
929 if (bucket_is_sync_source()) {
930 return true;
931 }
932
933 return (zone_svc->need_to_log_data() &&
934 bucket_info->datasync_flag_enabled());
935}
936
937bool RGWBucketSyncPolicyHandler::bucket_imports_data() const
938{
939 return bucket_is_sync_target();
940}
941