]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/driver/rados/rgw_bucket_sync.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / driver / rados / rgw_bucket_sync.cc
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
9f95a23c
TL
3
4#include "rgw_common.h"
5#include "rgw_bucket_sync.h"
6#include "rgw_data_sync.h"
7#include "rgw_zone.h"
8
9#include "services/svc_zone.h"
10#include "services/svc_bucket_sync.h"
11
12#define dout_subsys ceph_subsys_rgw
13
20effc67 14using namespace std;
9f95a23c
TL
15
16ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
17 os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or(rgw_zone_id()) << ",az=" << (int)e.all_zones << "}";
18 return os;
19}
20
21ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
22 os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
23 return os;
24}
25
26ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
27 os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<rgw_zone_id>()) << "}";
28 return os;
29}
30
31ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
32 os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
33 return os;
34}
35
36static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
37 const rgw_zone_id& source_zone,
38 const rgw_zone_id& dest_zone)
39{
40 std::vector<rgw_sync_bucket_pipe> relevant_pipes;
41 for (auto& p : pipes) {
42 if (p.source.match_zone(source_zone) &&
43 p.dest.match_zone(dest_zone)) {
44 for (auto pipe : p.expand()) {
45 pipe.source.apply_zone(source_zone);
46 pipe.dest.apply_zone(dest_zone);
47 relevant_pipes.push_back(pipe);
48 }
49 }
50 }
51
52 return relevant_pipes;
53}
54
55static bool is_wildcard_bucket(const rgw_bucket& bucket)
56{
57 return bucket.name.empty();
58}
59
60void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const
61{
62 encode_json("zone", zone.id, f);
63 encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f);
64 encode_json("sources", sources, f);
65 encode_json("dests", dests, f);
66}
67
68
69template <typename CB1, typename CB2>
70void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id& source_zone,
71 const rgw_zone_id& dest_zone,
72 const std::vector<rgw_sync_bucket_pipes>& pipes,
73 zb_pipe_map_t *pipe_map,
74 CB1 filter_cb,
75 CB2 call_filter_cb)
76{
77 if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
78 return;
79 }
80 auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
81
82 for (auto& pipe : relevant_pipes) {
83 rgw_sync_bucket_entity zb;
84 if (!call_filter_cb(pipe, &zb)) {
85 continue;
86 }
87 pipe_map->insert(make_pair(zb, pipe));
88 }
89}
90
91template <typename CB>
92void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id& source_zone,
93 const rgw_zone_id& dest_zone,
94 const std::vector<rgw_sync_bucket_pipes>& pipes,
95 CB filter_cb)
96{
97 return try_add_to_pipe_map(source_zone, dest_zone, pipes,
98 &sources,
99 filter_cb,
100 [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) {
101 *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()};
102 return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
103 });
104}
105
106template <typename CB>
107void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id& source_zone,
108 const rgw_zone_id& dest_zone,
109 const std::vector<rgw_sync_bucket_pipes>& pipes,
110 CB filter_cb)
111{
112 return try_add_to_pipe_map(source_zone, dest_zone, pipes,
113 &dests,
114 filter_cb,
115 [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) {
116 *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()};
117 return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
118 });
119}
120
121using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t;
122
123pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m,
124 const rgw_zone_id& zone,
125 std::optional<rgw_bucket> b) const
126{
127 if (!b) {
128 return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()});
129 }
130
131 auto zb = rgw_sync_bucket_entity{zone, *b};
132
133 auto range = m.equal_range(zb);
134 if (range.first == range.second &&
135 !is_wildcard_bucket(*b)) {
136 /* couldn't find the specific bucket, try to find by wildcard */
137 zb.bucket = rgw_bucket();
138 range = m.equal_range(zb);
139 }
140
141 return range;
142}
143
144
145template <typename CB>
20effc67
TL
146void rgw_sync_group_pipe_map::init(const DoutPrefixProvider *dpp,
147 CephContext *cct,
9f95a23c
TL
148 const rgw_zone_id& _zone,
149 std::optional<rgw_bucket> _bucket,
150 const rgw_sync_policy_group& group,
151 rgw_sync_data_flow_group *_default_flow,
152 std::set<rgw_zone_id> *_pall_zones,
153 CB filter_cb) {
154 zone = _zone;
155 bucket = _bucket;
156 default_flow = _default_flow;
157 pall_zones = _pall_zones;
158
159 rgw_sync_bucket_entity zb(zone, bucket);
160
161 status = group.status;
162
163 std::vector<rgw_sync_bucket_pipes> zone_pipes;
164
165 string bucket_key = (bucket ? bucket->get_key() : "*");
166
167 /* only look at pipes that touch the specific zone and bucket */
168 for (auto& pipe : group.pipes) {
169 if (pipe.contains_zone_bucket(zone, bucket)) {
20effc67 170 ldpp_dout(dpp, 20) << __func__ << "(): pipe_map (zone=" << zone << " bucket=" << bucket_key << "): adding potential pipe: " << pipe << dendl;
9f95a23c
TL
171 zone_pipes.push_back(pipe);
172 }
173 }
174
175 const rgw_sync_data_flow_group *pflow;
176
177 if (!group.data_flow.empty()) {
178 pflow = &group.data_flow;
179 } else {
180 if (!default_flow) {
181 return;
182 }
183 pflow = default_flow;
184 }
185
186 auto& flow = *pflow;
187
188 pall_zones->insert(zone);
189
190 /* symmetrical */
191 for (auto& symmetrical_group : flow.symmetrical) {
192 if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
193 for (auto& z : symmetrical_group.zones) {
194 if (z != zone) {
195 pall_zones->insert(z);
196 try_add_source(z, zone, zone_pipes, filter_cb);
197 try_add_dest(zone, z, zone_pipes, filter_cb);
198 }
199 }
200 }
201 }
202
203 /* directional */
204 for (auto& rule : flow.directional) {
205 if (rule.source_zone == zone) {
206 pall_zones->insert(rule.dest_zone);
207 try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
208 } else if (rule.dest_zone == zone) {
209 pall_zones->insert(rule.source_zone);
210 try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
211 }
212 }
213}
214
215/*
216 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
217 */
218vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id& source_zone,
219 std::optional<rgw_bucket> source_bucket,
220 std::optional<rgw_bucket> dest_bucket) const {
221 vector<rgw_sync_bucket_pipe> result;
222
223 auto range = find_pipes(sources, source_zone, source_bucket);
224
225 for (auto iter = range.first; iter != range.second; ++iter) {
226 auto pipe = iter->second;
227 if (pipe.dest.match_bucket(dest_bucket)) {
228 result.push_back(pipe);
229 }
230 }
231 return result;
232}
233
234/*
235 * find all relevant pipes in other zones that pull from a specific
236 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
237 */
238vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket,
239 const rgw_zone_id& dest_zone,
240 std::optional<rgw_bucket> dest_bucket) const {
241 vector<rgw_sync_bucket_pipe> result;
242
243 auto range = find_pipes(dests, dest_zone, dest_bucket);
244
245 for (auto iter = range.first; iter != range.second; ++iter) {
246 auto pipe = iter->second;
247 if (pipe.source.match_bucket(source_bucket)) {
248 result.push_back(pipe);
249 }
250 }
251
252 return result;
253}
254
255/*
256 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
257 */
258vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id& source_zone,
259 std::optional<rgw_bucket> source_bucket,
260 const rgw_zone_id& dest_zone,
261 std::optional<rgw_bucket> dest_bucket) const {
262 if (dest_zone == zone) {
263 return find_source_pipes(source_zone, source_bucket, dest_bucket);
264 }
265
266 if (source_zone == zone) {
267 return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
268 }
269
270 return vector<rgw_sync_bucket_pipe>();
271}
272
273void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
274{
275 pipes.push_back(pipe);
276
277 auto ppipe = &pipes.back();
278 auto prefix = ppipe->params.source.filter.prefix.value_or(string());
279
280 prefix_refs.insert(make_pair(prefix, ppipe));
281
282 for (auto& t : ppipe->params.source.filter.tags) {
283 string tag = t.key + "=" + t.value;
284 auto titer = tag_refs.find(tag);
285 if (titer != tag_refs.end() &&
286 ppipe->params.priority > titer->second->params.priority) {
287 titer->second = ppipe;
288 } else {
289 tag_refs[tag] = ppipe;
290 }
291 }
292}
293
294bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
295 std::optional<rgw_user> *user,
296 std::optional<rgw_user> *acl_translation_owner,
297 std::optional<string> *storage_class,
298 rgw_sync_pipe_params::Mode *mode,
299 bool *need_more_info) const
300{
301 std::optional<string> owner;
302
303 *need_more_info = false;
304
305 if (prefix_refs.empty()) {
306 return false;
307 }
308
309 auto end = prefix_refs.upper_bound(key.name);
310 auto iter = end;
311 if (iter != prefix_refs.begin()) {
312 --iter;
313 }
314 if (iter == prefix_refs.end()) {
315 return false;
316 }
317
318 if (iter != prefix_refs.begin()) {
319 iter = prefix_refs.find(iter->first); /* prefix_refs is multimap, find first element
320 holding that key */
321 }
322
323 std::vector<decltype(iter)> iters;
324
325 std::optional<int> priority;
326
327 for (; iter != end; ++iter) {
328 auto& prefix = iter->first;
329 if (!boost::starts_with(key.name, prefix)) {
330 continue;
331 }
332
333 auto& rule_params = iter->second->params;
334 auto& filter = rule_params.source.filter;
335
336 if (rule_params.priority > priority) {
337 priority = rule_params.priority;
338
339 if (!filter.has_tags()) {
340 iters.clear();
341 }
342 iters.push_back(iter);
343
344 *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then
345 we can't be sure if it would be used.
346 We need to first read the info from the source object */
347 }
348 }
349
350 if (iters.empty()) {
351 return false;
352 }
353
9f95a23c
TL
354 std::optional<rgw_user> _user;
355 std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
356 std::optional<string> _storage_class;
20effc67 357 rgw_sync_pipe_params::Mode _mode{rgw_sync_pipe_params::Mode::MODE_SYSTEM};
9f95a23c 358
20effc67
TL
359 // make sure all params are the same by saving the first one
360 // encountered and comparing all subsequent to it
361 bool first_iter = true;
9f95a23c 362 for (auto& iter : iters) {
20effc67
TL
363 const rgw_sync_pipe_params& rule_params = iter->second->params;
364 if (first_iter) {
9f95a23c
TL
365 _user = rule_params.user;
366 _acl_translation = rule_params.dest.acl_translation;
367 _storage_class = rule_params.dest.storage_class;
368 _mode = rule_params.mode;
20effc67
TL
369 first_iter = false;
370 } else {
371 // note: three of these == operators are comparing std::optional
372 // against std::optional; as one would expect they are equal a)
373 // if both do not contain values or b) if both do and those
374 // contained values are the same
375 const bool conflict =
376 !(_user == rule_params.user &&
377 _acl_translation == rule_params.dest.acl_translation &&
378 _storage_class == rule_params.dest.storage_class &&
379 _mode == rule_params.mode);
380 if (conflict) {
381 *need_more_info = true;
382 return false;
383 }
9f95a23c
TL
384 }
385 }
386
387 *user = _user;
388 if (_acl_translation) {
389 *acl_translation_owner = _acl_translation->owner;
390 }
391 *storage_class = _storage_class;
392 *mode = _mode;
393
394 return true;
395}
396
397bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
398 const RGWObjTags::tag_map_t& tags,
399 rgw_sync_pipe_params *params) const
400{
401 if (prefix_refs.empty()) {
402 return false;
403 }
404
405 auto iter = prefix_refs.upper_bound(key.name);
406 if (iter != prefix_refs.begin()) {
407 --iter;
408 }
409 if (iter == prefix_refs.end()) {
410 return false;
411 }
412
413 auto end = prefix_refs.upper_bound(key.name);
414 auto max = end;
415
416 std::optional<int> priority;
417
418 for (; iter != end; ++iter) {
419 /* NOTE: this is not the most efficient way to do it,
420 * a trie data structure would be better
421 */
422 auto& prefix = iter->first;
423 if (!boost::starts_with(key.name, prefix)) {
424 continue;
425 }
426
427 auto& rule_params = iter->second->params;
428 auto& filter = rule_params.source.filter;
429
430 if (!filter.check_tags(tags)) {
431 continue;
432 }
433
434 if (rule_params.priority > priority) {
435 priority = rule_params.priority;
436 max = iter;
437 }
438 }
439
440 if (max == end) {
441 return false;
442 }
443
444 *params = max->second->params;
445 return true;
446}
447
448/*
449 * return either the current prefix for s, or the next one if s is not within a prefix
450 */
451
452RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const
453{
454 if (prefix_refs.empty()) {
455 return prefix_refs.end();
456 }
457 auto next = prefix_refs.upper_bound(s);
458 auto iter = next;
459 if (iter != prefix_refs.begin()) {
460 --iter;
461 }
462 if (!boost::starts_with(s, iter->first)) {
463 return next;
464 }
465
466 return iter;
467}
468
469void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
1e59de90
TL
470 /* Ensure this pipe doesn't match with any disabled pipes */
471 for (auto p: disabled_pipe_map) {
472 if (p.second.source.match(pipe.source) && p.second.dest.match(pipe.dest)) {
473 return;
474 }
475 }
9f95a23c
TL
476 pipe_map.insert(make_pair(pipe.id, pipe));
477
478 auto& rules_ref = rules[endpoints_pair(pipe)];
479
480 if (!rules_ref) {
481 rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>();
482 }
483
484 rules_ref->insert(pipe);
485
486 pipe_handler h(rules_ref, pipe);
487
488 handlers.insert(h);
489}
490
1e59de90
TL
491void RGWBucketSyncFlowManager::pipe_set::remove_all() {
492 pipe_map.clear();
493 disabled_pipe_map.clear();
494 rules.clear();
495 handlers.clear();
496}
497
498void RGWBucketSyncFlowManager::pipe_set::disable(const rgw_sync_bucket_pipe& pipe) {
499 /* This pipe is disabled. Add it to disabled pipes & remove any
500 * matching pipes already inserted
501 */
502 disabled_pipe_map.insert(make_pair(pipe.id, pipe));
503 for (auto iter_p = pipe_map.begin(); iter_p != pipe_map.end(); ) {
504 auto p = iter_p++;
505 if (p->second.source.match(pipe.source) && p->second.dest.match(pipe.dest)) {
506 auto& rules_ref = rules[endpoints_pair(p->second)];
507 if (rules_ref) {
508 pipe_handler h(rules_ref, p->second);
509 handlers.erase(h);
510 }
511 rules.erase(endpoints_pair(p->second));
512 pipe_map.erase(p);
513 }
514 }
515}
516
9f95a23c
TL
517void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
518{
519 encode_json("pipes", pipe_map, f);
520}
521
522bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id& source_zone,
523 std::optional<rgw_bucket> source_bucket,
524 const rgw_zone_id& dest_zone,
525 std::optional<rgw_bucket> dest_bucket,
526 bool check_activated) const
527{
528 bool found = false;
529 bool found_activated = false;
530
531 for (auto m : flow_groups) {
532 auto& fm = m.second;
533 auto pipes = fm.find_pipes(source_zone, source_bucket,
534 dest_zone, dest_bucket);
535
536 bool is_found = !pipes.empty();
537
538 if (is_found) {
539 switch (fm.status) {
540 case rgw_sync_policy_group::Status::FORBIDDEN:
541 return false;
542 case rgw_sync_policy_group::Status::ENABLED:
543 found = true;
544 found_activated = true;
545 break;
546 case rgw_sync_policy_group::Status::ALLOWED:
547 found = true;
548 break;
549 default:
550 break; /* unknown -- ignore */
551 }
552 }
553 }
554
555 if (check_activated && found_activated) {
556 return true;
557 }
558
559 return found;
560}
561
20effc67 562void RGWBucketSyncFlowManager::init(const DoutPrefixProvider *dpp, const rgw_sync_policy_info& sync_policy) {
9f95a23c
TL
563 std::optional<rgw_sync_data_flow_group> default_flow;
564 if (parent) {
565 default_flow.emplace();
566 default_flow->init_default(parent->all_zones);
567 }
568
569 for (auto& item : sync_policy.groups) {
570 auto& group = item.second;
571 auto& flow_group_map = flow_groups[group.id];
572
20effc67 573 flow_group_map.init(dpp, cct, zone_id, bucket, group,
9f95a23c
TL
574 (default_flow ? &(*default_flow) : nullptr),
575 &all_zones,
576 [&](const rgw_zone_id& source_zone,
577 std::optional<rgw_bucket> source_bucket,
578 const rgw_zone_id& dest_zone,
579 std::optional<rgw_bucket> dest_bucket) {
580 if (!parent) {
581 return true;
582 }
583 return parent->allowed_data_flow(source_zone,
584 source_bucket,
585 dest_zone,
586 dest_bucket,
587 false); /* just check that it's not disabled */
588 });
589 }
590}
591
1e59de90
TL
592/*
593* These are the semantics to be followed while resolving the policy
594* conflicts -
595*
596* ==================================================
597* zonegroup bucket Result
598* ==================================================
599* enabled enabled enabled
600* allowed enabled
601* forbidden disabled
602* allowed enabled enabled
603* allowed disabled
604* forbidden disabled
605* forbidden enabled disabled
606* allowed disabled
607* forbidden disabled
608*
609* In case multiple group policies are set to reflect for any sync pair
610* (<source-zone,source-bucket>, <dest-zone,dest-bucket>), the following
611* rules are applied in the order-
612* 1) Even if one policy status is FORBIDDEN, the sync will be disabled
613* 2) Atleast one policy should be ENABLED for the sync to be allowed.
614*
615*/
20effc67
TL
616void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
617 std::optional<rgw_bucket> effective_bucket,
9f95a23c
TL
618 RGWBucketSyncFlowManager::pipe_set *source_pipes,
619 RGWBucketSyncFlowManager::pipe_set *dest_pipes,
620 bool only_enabled) const
621
622{
623 string effective_bucket_key;
1e59de90 624 bool is_forbidden = false;
9f95a23c
TL
625 if (effective_bucket) {
626 effective_bucket_key = effective_bucket->get_key();
627 }
628 if (parent) {
20effc67 629 parent->reflect(dpp, effective_bucket, source_pipes, dest_pipes, only_enabled);
9f95a23c
TL
630 }
631
632 for (auto& item : flow_groups) {
633 auto& flow_group_map = item.second;
1e59de90
TL
634 is_forbidden = false;
635
636 if (flow_group_map.status == rgw_sync_policy_group::Status::FORBIDDEN) {
637 /* FORBIDDEN takes precedence over all the other rules.
638 * Remove any other pipes which may allow access.
639 */
640 is_forbidden = true;
641 } else if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
9f95a23c 642 (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
1e59de90 643 /* only return enabled groups */
9f95a23c
TL
644 continue;
645 }
646
647 for (auto& entry : flow_group_map.sources) {
648 rgw_sync_bucket_pipe pipe = entry.second;
649 if (!pipe.dest.match_bucket(effective_bucket)) {
650 continue;
651 }
652
653 pipe.source.apply_bucket(effective_bucket);
654 pipe.dest.apply_bucket(effective_bucket);
655
1e59de90
TL
656 if (is_forbidden) {
657 ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing source pipe: " << pipe << dendl;
658 source_pipes->disable(pipe);
659 } else {
660 ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
661 source_pipes->insert(pipe);
662 }
9f95a23c
TL
663 }
664
665 for (auto& entry : flow_group_map.dests) {
666 rgw_sync_bucket_pipe pipe = entry.second;
667
668 if (!pipe.source.match_bucket(effective_bucket)) {
669 continue;
670 }
671
672 pipe.source.apply_bucket(effective_bucket);
673 pipe.dest.apply_bucket(effective_bucket);
674
1e59de90
TL
675 if (is_forbidden) {
676 ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing dest pipe: " << pipe << dendl;
677 dest_pipes->disable(pipe);
678 } else {
679 ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
680 dest_pipes->insert(pipe);
681 }
9f95a23c
TL
682 }
683 }
684}
685
686
687RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext *_cct,
688 const rgw_zone_id& _zone_id,
689 std::optional<rgw_bucket> _bucket,
690 const RGWBucketSyncFlowManager *_parent) : cct(_cct),
691 zone_id(_zone_id),
692 bucket(_bucket),
693 parent(_parent) {}
694
695
696void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc,
697 RGWSI_SyncModules *sync_modules_svc,
698 rgw_sync_policy_info *ppolicy)
699{
700 bool found = false;
701
702 rgw_sync_policy_info policy;
703
704 auto& group = policy.groups["default"];
705 auto& zonegroup = zone_svc->get_zonegroup();
706
707 for (const auto& ziter1 : zonegroup.zones) {
708 auto& id1 = ziter1.first;
709 const RGWZone& z1 = ziter1.second;
710
711 for (const auto& ziter2 : zonegroup.zones) {
712 auto& id2 = ziter2.first;
713 const RGWZone& z2 = ziter2.second;
714
715 if (id1 == id2) {
716 continue;
717 }
718
719 if (z1.syncs_from(z2.name)) {
720 found = true;
721 rgw_sync_directional_rule *rule;
722 group.data_flow.find_or_create_directional(id2,
723 id1,
724 &rule);
725 }
726 }
727 }
728
729 if (!found) { /* nothing syncs */
730 return;
731 }
732
733 rgw_sync_bucket_pipes pipes;
734 pipes.id = "all";
735 pipes.source.all_zones = true;
736 pipes.dest.all_zones = true;
737
738 group.pipes.emplace_back(std::move(pipes));
739
740
741 group.status = rgw_sync_policy_group::Status::ENABLED;
742
743 *ppolicy = std::move(policy);
744}
745
746RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
747 RGWSI_SyncModules *sync_modules_svc,
748 RGWSI_Bucket_Sync *_bucket_sync_svc,
749 std::optional<rgw_zone_id> effective_zone) : zone_svc(_zone_svc) ,
750 bucket_sync_svc(_bucket_sync_svc) {
751 zone_id = effective_zone.value_or(zone_svc->zone_id());
752 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
753 zone_id,
754 nullopt,
755 nullptr));
756 sync_policy = zone_svc->get_zonegroup().sync_policy;
757
758 if (sync_policy.empty()) {
759 RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy);
760 legacy_config = true;
761 }
762}
763
764RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
765 const RGWBucketInfo& _bucket_info,
766 map<string, bufferlist>&& _bucket_attrs) : parent(_parent),
767 bucket_info(_bucket_info),
768 bucket_attrs(std::move(_bucket_attrs)) {
769 if (_bucket_info.sync_policy) {
770 sync_policy = *_bucket_info.sync_policy;
771
772 for (auto& entry : sync_policy.groups) {
773 for (auto& pipe : entry.second.pipes) {
774 if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER &&
775 pipe.params.user.empty()) {
776 pipe.params.user = _bucket_info.owner;
777 }
778 }
779 }
780 }
781 legacy_config = parent->legacy_config;
782 bucket = _bucket_info.bucket;
783 zone_svc = parent->zone_svc;
784 bucket_sync_svc = parent->bucket_sync_svc;
785 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
786 parent->zone_id,
787 _bucket_info.bucket,
788 parent->flow_mgr.get()));
789}
790
791RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
792 const rgw_bucket& _bucket,
793 std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) {
794 if (_sync_policy) {
795 sync_policy = *_sync_policy;
796 }
797 legacy_config = parent->legacy_config;
798 bucket = _bucket;
799 zone_svc = parent->zone_svc;
800 bucket_sync_svc = parent->bucket_sync_svc;
801 flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(),
802 parent->zone_id,
803 _bucket,
804 parent->flow_mgr.get()));
805}
806
807RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info,
808 map<string, bufferlist>&& bucket_attrs) const
809{
810 return new RGWBucketSyncPolicyHandler(this, bucket_info, std::move(bucket_attrs));
811}
812
813RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket,
814 std::optional<rgw_sync_policy_info> sync_policy) const
815{
816 return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
817}
818
b3b6e05e 819int RGWBucketSyncPolicyHandler::init(const DoutPrefixProvider *dpp, optional_yield y)
9f95a23c 820{
b3b6e05e 821 int r = bucket_sync_svc->get_bucket_sync_hints(dpp, bucket.value_or(rgw_bucket()),
9f95a23c
TL
822 &source_hints,
823 &target_hints,
824 y);
825 if (r < 0) {
b3b6e05e 826 ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket="
9f95a23c
TL
827 << bucket << " returned r=" << r << dendl;
828 return r;
829 }
830
20effc67 831 flow_mgr->init(dpp, sync_policy);
9f95a23c 832
20effc67 833 reflect(dpp, &source_pipes,
9f95a23c
TL
834 &target_pipes,
835 &sources,
836 &targets,
837 &source_zones,
838 &target_zones,
839 true);
840
841 return 0;
842}
843
20effc67 844void RGWBucketSyncPolicyHandler::reflect(const DoutPrefixProvider *dpp, RGWBucketSyncFlowManager::pipe_set *psource_pipes,
9f95a23c
TL
845 RGWBucketSyncFlowManager::pipe_set *ptarget_pipes,
846 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources,
847 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets,
848 std::set<rgw_zone_id> *psource_zones,
849 std::set<rgw_zone_id> *ptarget_zones,
850 bool only_enabled) const
851{
852 RGWBucketSyncFlowManager::pipe_set _source_pipes;
853 RGWBucketSyncFlowManager::pipe_set _target_pipes;
854 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _sources;
855 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _targets;
856 std::set<rgw_zone_id> _source_zones;
857 std::set<rgw_zone_id> _target_zones;
858
20effc67 859 flow_mgr->reflect(dpp, bucket, &_source_pipes, &_target_pipes, only_enabled);
9f95a23c
TL
860
861 for (auto& entry : _source_pipes.pipe_map) {
862 auto& pipe = entry.second;
863 if (!pipe.source.zone) {
864 continue;
865 }
866 _source_zones.insert(*pipe.source.zone);
867 _sources[*pipe.source.zone].insert(pipe);
868 }
869
870 for (auto& entry : _target_pipes.pipe_map) {
871 auto& pipe = entry.second;
872 if (!pipe.dest.zone) {
873 continue;
874 }
875 _target_zones.insert(*pipe.dest.zone);
876 _targets[*pipe.dest.zone].insert(pipe);
877 }
878
879 if (psource_pipes) {
880 *psource_pipes = std::move(_source_pipes);
881 }
882 if (ptarget_pipes) {
883 *ptarget_pipes = std::move(_target_pipes);
884 }
885 if (psources) {
886 *psources = std::move(_sources);
887 }
888 if (ptargets) {
889 *ptargets = std::move(_targets);
890 }
891 if (psource_zones) {
892 *psource_zones = std::move(_source_zones);
893 }
894 if (ptarget_zones) {
895 *ptarget_zones = std::move(_target_zones);
896 }
897}
898
899multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const
900{
901 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
902
903 for (auto& source_entry : sources) {
904 auto& zone_id = source_entry.first;
905
906 auto& pipes = source_entry.second.pipe_map;
907
908 for (auto& entry : pipes) {
909 auto& pipe = entry.second;
910 m.insert(make_pair(zone_id, pipe));
911 }
912 }
913
914 for (auto& pipe : resolved_sources) {
915 if (!pipe.source.zone) {
916 continue;
917 }
918
919 m.insert(make_pair(*pipe.source.zone, pipe));
920 }
921
922 return m;
923}
924
925multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const
926{
927 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
928
929 for (auto& dest_entry : targets) {
930 auto& zone_id = dest_entry.first;
931
932 auto& pipes = dest_entry.second.pipe_map;
933
934 for (auto& entry : pipes) {
935 auto& pipe = entry.second;
936 m.insert(make_pair(zone_id, pipe));
937 }
938 }
939
940 for (auto& pipe : resolved_dests) {
941 if (!pipe.dest.zone) {
942 continue;
943 }
944
945 m.insert(make_pair(*pipe.dest.zone, pipe));
946 }
947
948 return m;
949}
950
951multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const
952{
953 multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
954
955 auto iter = targets.find(zone_id);
956 if (iter != targets.end()) {
957 auto& pipes = iter->second.pipe_map;
958
959 for (auto& entry : pipes) {
960 auto& pipe = entry.second;
961 m.insert(make_pair(zone_id, pipe));
962 }
963 }
964
965 for (auto& pipe : resolved_dests) {
966 if (!pipe.dest.zone ||
967 *pipe.dest.zone != zone_id) {
968 continue;
969 }
970
971 m.insert(make_pair(*pipe.dest.zone, pipe));
972 }
973
974 return m;
975}
976
977void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
978 std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */
979 for (auto& entry : source_pipes.pipe_map) {
980 auto& source_pipe = entry.second;
981 if (!filter_peer ||
982 source_pipe.source.match(*filter_peer)) {
983 _sources->insert(source_pipe);
984 }
985 }
986
987 for (auto& entry : target_pipes.pipe_map) {
988 auto& target_pipe = entry.second;
989 if (!filter_peer ||
990 target_pipe.dest.match(*filter_peer)) {
991 _targets->insert(target_pipe);
992 }
993 }
994}
995
996bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
997{
998 if (!bucket) {
999 return false;
1000 }
1001
1e59de90
TL
1002 if (!zone_svc->sync_module_exports_data()) {
1003 return false;
1004 }
1005
9f95a23c
TL
1006 if (bucket_is_sync_source()) {
1007 return true;
1008 }
1009
1010 return (zone_svc->need_to_log_data() &&
1011 bucket_info->datasync_flag_enabled());
1012}
1013
1014bool RGWBucketSyncPolicyHandler::bucket_imports_data() const
1015{
1016 return bucket_is_sync_target();
1017}
1018