]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
9f95a23c TL |
3 | |
4 | #include "rgw_common.h" | |
5 | #include "rgw_bucket_sync.h" | |
6 | #include "rgw_data_sync.h" | |
7 | #include "rgw_zone.h" | |
8 | ||
9 | #include "services/svc_zone.h" | |
10 | #include "services/svc_bucket_sync.h" | |
11 | ||
12 | #define dout_subsys ceph_subsys_rgw | |
13 | ||
20effc67 | 14 | using namespace std; |
9f95a23c TL |
15 | |
16 | ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { | |
17 | os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or(rgw_zone_id()) << ",az=" << (int)e.all_zones << "}"; | |
18 | return os; | |
19 | } | |
20 | ||
21 | ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { | |
22 | os << "{s=" << pipe.source << ",d=" << pipe.dest << "}"; | |
23 | return os; | |
24 | } | |
25 | ||
26 | ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) { | |
27 | os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<rgw_zone_id>()) << "}"; | |
28 | return os; | |
29 | } | |
30 | ||
31 | ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) { | |
32 | os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}"; | |
33 | return os; | |
34 | } | |
35 | ||
36 | static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes, | |
37 | const rgw_zone_id& source_zone, | |
38 | const rgw_zone_id& dest_zone) | |
39 | { | |
40 | std::vector<rgw_sync_bucket_pipe> relevant_pipes; | |
41 | for (auto& p : pipes) { | |
42 | if (p.source.match_zone(source_zone) && | |
43 | p.dest.match_zone(dest_zone)) { | |
44 | for (auto pipe : p.expand()) { | |
45 | pipe.source.apply_zone(source_zone); | |
46 | pipe.dest.apply_zone(dest_zone); | |
47 | relevant_pipes.push_back(pipe); | |
48 | } | |
49 | } | |
50 | } | |
51 | ||
52 | return relevant_pipes; | |
53 | } | |
54 | ||
55 | static bool is_wildcard_bucket(const rgw_bucket& bucket) | |
56 | { | |
57 | return bucket.name.empty(); | |
58 | } | |
59 | ||
60 | void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const | |
61 | { | |
62 | encode_json("zone", zone.id, f); | |
63 | encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); | |
64 | encode_json("sources", sources, f); | |
65 | encode_json("dests", dests, f); | |
66 | } | |
67 | ||
68 | ||
69 | template <typename CB1, typename CB2> | |
70 | void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id& source_zone, | |
71 | const rgw_zone_id& dest_zone, | |
72 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
73 | zb_pipe_map_t *pipe_map, | |
74 | CB1 filter_cb, | |
75 | CB2 call_filter_cb) | |
76 | { | |
77 | if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) { | |
78 | return; | |
79 | } | |
80 | auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone); | |
81 | ||
82 | for (auto& pipe : relevant_pipes) { | |
83 | rgw_sync_bucket_entity zb; | |
84 | if (!call_filter_cb(pipe, &zb)) { | |
85 | continue; | |
86 | } | |
87 | pipe_map->insert(make_pair(zb, pipe)); | |
88 | } | |
89 | } | |
90 | ||
91 | template <typename CB> | |
92 | void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id& source_zone, | |
93 | const rgw_zone_id& dest_zone, | |
94 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
95 | CB filter_cb) | |
96 | { | |
97 | return try_add_to_pipe_map(source_zone, dest_zone, pipes, | |
98 | &sources, | |
99 | filter_cb, | |
100 | [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { | |
101 | *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()}; | |
102 | return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket()); | |
103 | }); | |
104 | } | |
105 | ||
106 | template <typename CB> | |
107 | void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id& source_zone, | |
108 | const rgw_zone_id& dest_zone, | |
109 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
110 | CB filter_cb) | |
111 | { | |
112 | return try_add_to_pipe_map(source_zone, dest_zone, pipes, | |
113 | &dests, | |
114 | filter_cb, | |
115 | [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { | |
116 | *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()}; | |
117 | return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket); | |
118 | }); | |
119 | } | |
120 | ||
121 | using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t; | |
122 | ||
123 | pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m, | |
124 | const rgw_zone_id& zone, | |
125 | std::optional<rgw_bucket> b) const | |
126 | { | |
127 | if (!b) { | |
128 | return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()}); | |
129 | } | |
130 | ||
131 | auto zb = rgw_sync_bucket_entity{zone, *b}; | |
132 | ||
133 | auto range = m.equal_range(zb); | |
134 | if (range.first == range.second && | |
135 | !is_wildcard_bucket(*b)) { | |
136 | /* couldn't find the specific bucket, try to find by wildcard */ | |
137 | zb.bucket = rgw_bucket(); | |
138 | range = m.equal_range(zb); | |
139 | } | |
140 | ||
141 | return range; | |
142 | } | |
143 | ||
144 | ||
145 | template <typename CB> | |
20effc67 TL |
146 | void rgw_sync_group_pipe_map::init(const DoutPrefixProvider *dpp, |
147 | CephContext *cct, | |
9f95a23c TL |
148 | const rgw_zone_id& _zone, |
149 | std::optional<rgw_bucket> _bucket, | |
150 | const rgw_sync_policy_group& group, | |
151 | rgw_sync_data_flow_group *_default_flow, | |
152 | std::set<rgw_zone_id> *_pall_zones, | |
153 | CB filter_cb) { | |
154 | zone = _zone; | |
155 | bucket = _bucket; | |
156 | default_flow = _default_flow; | |
157 | pall_zones = _pall_zones; | |
158 | ||
159 | rgw_sync_bucket_entity zb(zone, bucket); | |
160 | ||
161 | status = group.status; | |
162 | ||
163 | std::vector<rgw_sync_bucket_pipes> zone_pipes; | |
164 | ||
165 | string bucket_key = (bucket ? bucket->get_key() : "*"); | |
166 | ||
167 | /* only look at pipes that touch the specific zone and bucket */ | |
168 | for (auto& pipe : group.pipes) { | |
169 | if (pipe.contains_zone_bucket(zone, bucket)) { | |
20effc67 | 170 | ldpp_dout(dpp, 20) << __func__ << "(): pipe_map (zone=" << zone << " bucket=" << bucket_key << "): adding potential pipe: " << pipe << dendl; |
9f95a23c TL |
171 | zone_pipes.push_back(pipe); |
172 | } | |
173 | } | |
174 | ||
175 | const rgw_sync_data_flow_group *pflow; | |
176 | ||
177 | if (!group.data_flow.empty()) { | |
178 | pflow = &group.data_flow; | |
179 | } else { | |
180 | if (!default_flow) { | |
181 | return; | |
182 | } | |
183 | pflow = default_flow; | |
184 | } | |
185 | ||
186 | auto& flow = *pflow; | |
187 | ||
188 | pall_zones->insert(zone); | |
189 | ||
190 | /* symmetrical */ | |
191 | for (auto& symmetrical_group : flow.symmetrical) { | |
192 | if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) { | |
193 | for (auto& z : symmetrical_group.zones) { | |
194 | if (z != zone) { | |
195 | pall_zones->insert(z); | |
196 | try_add_source(z, zone, zone_pipes, filter_cb); | |
197 | try_add_dest(zone, z, zone_pipes, filter_cb); | |
198 | } | |
199 | } | |
200 | } | |
201 | } | |
202 | ||
203 | /* directional */ | |
204 | for (auto& rule : flow.directional) { | |
205 | if (rule.source_zone == zone) { | |
206 | pall_zones->insert(rule.dest_zone); | |
207 | try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb); | |
208 | } else if (rule.dest_zone == zone) { | |
209 | pall_zones->insert(rule.source_zone); | |
210 | try_add_source(rule.source_zone, zone, zone_pipes, filter_cb); | |
211 | } | |
212 | } | |
213 | } | |
214 | ||
215 | /* | |
216 | * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} | |
217 | */ | |
218 | vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id& source_zone, | |
219 | std::optional<rgw_bucket> source_bucket, | |
220 | std::optional<rgw_bucket> dest_bucket) const { | |
221 | vector<rgw_sync_bucket_pipe> result; | |
222 | ||
223 | auto range = find_pipes(sources, source_zone, source_bucket); | |
224 | ||
225 | for (auto iter = range.first; iter != range.second; ++iter) { | |
226 | auto pipe = iter->second; | |
227 | if (pipe.dest.match_bucket(dest_bucket)) { | |
228 | result.push_back(pipe); | |
229 | } | |
230 | } | |
231 | return result; | |
232 | } | |
233 | ||
234 | /* | |
235 | * find all relevant pipes in other zones that pull from a specific | |
236 | * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} | |
237 | */ | |
238 | vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket, | |
239 | const rgw_zone_id& dest_zone, | |
240 | std::optional<rgw_bucket> dest_bucket) const { | |
241 | vector<rgw_sync_bucket_pipe> result; | |
242 | ||
243 | auto range = find_pipes(dests, dest_zone, dest_bucket); | |
244 | ||
245 | for (auto iter = range.first; iter != range.second; ++iter) { | |
246 | auto pipe = iter->second; | |
247 | if (pipe.source.match_bucket(source_bucket)) { | |
248 | result.push_back(pipe); | |
249 | } | |
250 | } | |
251 | ||
252 | return result; | |
253 | } | |
254 | ||
255 | /* | |
256 | * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} | |
257 | */ | |
258 | vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id& source_zone, | |
259 | std::optional<rgw_bucket> source_bucket, | |
260 | const rgw_zone_id& dest_zone, | |
261 | std::optional<rgw_bucket> dest_bucket) const { | |
262 | if (dest_zone == zone) { | |
263 | return find_source_pipes(source_zone, source_bucket, dest_bucket); | |
264 | } | |
265 | ||
266 | if (source_zone == zone) { | |
267 | return find_dest_pipes(source_bucket, dest_zone, dest_bucket); | |
268 | } | |
269 | ||
270 | return vector<rgw_sync_bucket_pipe>(); | |
271 | } | |
272 | ||
273 | void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe) | |
274 | { | |
275 | pipes.push_back(pipe); | |
276 | ||
277 | auto ppipe = &pipes.back(); | |
278 | auto prefix = ppipe->params.source.filter.prefix.value_or(string()); | |
279 | ||
280 | prefix_refs.insert(make_pair(prefix, ppipe)); | |
281 | ||
282 | for (auto& t : ppipe->params.source.filter.tags) { | |
283 | string tag = t.key + "=" + t.value; | |
284 | auto titer = tag_refs.find(tag); | |
285 | if (titer != tag_refs.end() && | |
286 | ppipe->params.priority > titer->second->params.priority) { | |
287 | titer->second = ppipe; | |
288 | } else { | |
289 | tag_refs[tag] = ppipe; | |
290 | } | |
291 | } | |
292 | } | |
293 | ||
294 | bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key, | |
295 | std::optional<rgw_user> *user, | |
296 | std::optional<rgw_user> *acl_translation_owner, | |
297 | std::optional<string> *storage_class, | |
298 | rgw_sync_pipe_params::Mode *mode, | |
299 | bool *need_more_info) const | |
300 | { | |
301 | std::optional<string> owner; | |
302 | ||
303 | *need_more_info = false; | |
304 | ||
305 | if (prefix_refs.empty()) { | |
306 | return false; | |
307 | } | |
308 | ||
309 | auto end = prefix_refs.upper_bound(key.name); | |
310 | auto iter = end; | |
311 | if (iter != prefix_refs.begin()) { | |
312 | --iter; | |
313 | } | |
314 | if (iter == prefix_refs.end()) { | |
315 | return false; | |
316 | } | |
317 | ||
318 | if (iter != prefix_refs.begin()) { | |
319 | iter = prefix_refs.find(iter->first); /* prefix_refs is multimap, find first element | |
320 | holding that key */ | |
321 | } | |
322 | ||
323 | std::vector<decltype(iter)> iters; | |
324 | ||
325 | std::optional<int> priority; | |
326 | ||
327 | for (; iter != end; ++iter) { | |
328 | auto& prefix = iter->first; | |
329 | if (!boost::starts_with(key.name, prefix)) { | |
330 | continue; | |
331 | } | |
332 | ||
333 | auto& rule_params = iter->second->params; | |
334 | auto& filter = rule_params.source.filter; | |
335 | ||
336 | if (rule_params.priority > priority) { | |
337 | priority = rule_params.priority; | |
338 | ||
339 | if (!filter.has_tags()) { | |
340 | iters.clear(); | |
341 | } | |
342 | iters.push_back(iter); | |
343 | ||
344 | *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then | |
345 | we can't be sure if it would be used. | |
346 | We need to first read the info from the source object */ | |
347 | } | |
348 | } | |
349 | ||
350 | if (iters.empty()) { | |
351 | return false; | |
352 | } | |
353 | ||
9f95a23c TL |
354 | std::optional<rgw_user> _user; |
355 | std::optional<rgw_sync_pipe_acl_translation> _acl_translation; | |
356 | std::optional<string> _storage_class; | |
20effc67 | 357 | rgw_sync_pipe_params::Mode _mode{rgw_sync_pipe_params::Mode::MODE_SYSTEM}; |
9f95a23c | 358 | |
20effc67 TL |
359 | // make sure all params are the same by saving the first one |
360 | // encountered and comparing all subsequent to it | |
361 | bool first_iter = true; | |
9f95a23c | 362 | for (auto& iter : iters) { |
20effc67 TL |
363 | const rgw_sync_pipe_params& rule_params = iter->second->params; |
364 | if (first_iter) { | |
9f95a23c TL |
365 | _user = rule_params.user; |
366 | _acl_translation = rule_params.dest.acl_translation; | |
367 | _storage_class = rule_params.dest.storage_class; | |
368 | _mode = rule_params.mode; | |
20effc67 TL |
369 | first_iter = false; |
370 | } else { | |
371 | // note: three of these == operators are comparing std::optional | |
372 | // against std::optional; as one would expect they are equal a) | |
373 | // if both do not contain values or b) if both do and those | |
374 | // contained values are the same | |
375 | const bool conflict = | |
376 | !(_user == rule_params.user && | |
377 | _acl_translation == rule_params.dest.acl_translation && | |
378 | _storage_class == rule_params.dest.storage_class && | |
379 | _mode == rule_params.mode); | |
380 | if (conflict) { | |
381 | *need_more_info = true; | |
382 | return false; | |
383 | } | |
9f95a23c TL |
384 | } |
385 | } | |
386 | ||
387 | *user = _user; | |
388 | if (_acl_translation) { | |
389 | *acl_translation_owner = _acl_translation->owner; | |
390 | } | |
391 | *storage_class = _storage_class; | |
392 | *mode = _mode; | |
393 | ||
394 | return true; | |
395 | } | |
396 | ||
397 | bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key, | |
398 | const RGWObjTags::tag_map_t& tags, | |
399 | rgw_sync_pipe_params *params) const | |
400 | { | |
401 | if (prefix_refs.empty()) { | |
402 | return false; | |
403 | } | |
404 | ||
405 | auto iter = prefix_refs.upper_bound(key.name); | |
406 | if (iter != prefix_refs.begin()) { | |
407 | --iter; | |
408 | } | |
409 | if (iter == prefix_refs.end()) { | |
410 | return false; | |
411 | } | |
412 | ||
413 | auto end = prefix_refs.upper_bound(key.name); | |
414 | auto max = end; | |
415 | ||
416 | std::optional<int> priority; | |
417 | ||
418 | for (; iter != end; ++iter) { | |
419 | /* NOTE: this is not the most efficient way to do it, | |
420 | * a trie data structure would be better | |
421 | */ | |
422 | auto& prefix = iter->first; | |
423 | if (!boost::starts_with(key.name, prefix)) { | |
424 | continue; | |
425 | } | |
426 | ||
427 | auto& rule_params = iter->second->params; | |
428 | auto& filter = rule_params.source.filter; | |
429 | ||
430 | if (!filter.check_tags(tags)) { | |
431 | continue; | |
432 | } | |
433 | ||
434 | if (rule_params.priority > priority) { | |
435 | priority = rule_params.priority; | |
436 | max = iter; | |
437 | } | |
438 | } | |
439 | ||
440 | if (max == end) { | |
441 | return false; | |
442 | } | |
443 | ||
444 | *params = max->second->params; | |
445 | return true; | |
446 | } | |
447 | ||
448 | /* | |
449 | * return either the current prefix for s, or the next one if s is not within a prefix | |
450 | */ | |
451 | ||
452 | RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const | |
453 | { | |
454 | if (prefix_refs.empty()) { | |
455 | return prefix_refs.end(); | |
456 | } | |
457 | auto next = prefix_refs.upper_bound(s); | |
458 | auto iter = next; | |
459 | if (iter != prefix_refs.begin()) { | |
460 | --iter; | |
461 | } | |
462 | if (!boost::starts_with(s, iter->first)) { | |
463 | return next; | |
464 | } | |
465 | ||
466 | return iter; | |
467 | } | |
468 | ||
469 | void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) { | |
1e59de90 TL |
470 | /* Ensure this pipe doesn't match with any disabled pipes */ |
471 | for (auto p: disabled_pipe_map) { | |
472 | if (p.second.source.match(pipe.source) && p.second.dest.match(pipe.dest)) { | |
473 | return; | |
474 | } | |
475 | } | |
9f95a23c TL |
476 | pipe_map.insert(make_pair(pipe.id, pipe)); |
477 | ||
478 | auto& rules_ref = rules[endpoints_pair(pipe)]; | |
479 | ||
480 | if (!rules_ref) { | |
481 | rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>(); | |
482 | } | |
483 | ||
484 | rules_ref->insert(pipe); | |
485 | ||
486 | pipe_handler h(rules_ref, pipe); | |
487 | ||
488 | handlers.insert(h); | |
489 | } | |
490 | ||
1e59de90 TL |
491 | void RGWBucketSyncFlowManager::pipe_set::remove_all() { |
492 | pipe_map.clear(); | |
493 | disabled_pipe_map.clear(); | |
494 | rules.clear(); | |
495 | handlers.clear(); | |
496 | } | |
497 | ||
498 | void RGWBucketSyncFlowManager::pipe_set::disable(const rgw_sync_bucket_pipe& pipe) { | |
499 | /* This pipe is disabled. Add it to disabled pipes & remove any | |
500 | * matching pipes already inserted | |
501 | */ | |
502 | disabled_pipe_map.insert(make_pair(pipe.id, pipe)); | |
503 | for (auto iter_p = pipe_map.begin(); iter_p != pipe_map.end(); ) { | |
504 | auto p = iter_p++; | |
505 | if (p->second.source.match(pipe.source) && p->second.dest.match(pipe.dest)) { | |
506 | auto& rules_ref = rules[endpoints_pair(p->second)]; | |
507 | if (rules_ref) { | |
508 | pipe_handler h(rules_ref, p->second); | |
509 | handlers.erase(h); | |
510 | } | |
511 | rules.erase(endpoints_pair(p->second)); | |
512 | pipe_map.erase(p); | |
513 | } | |
514 | } | |
515 | } | |
516 | ||
9f95a23c TL |
517 | void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const |
518 | { | |
519 | encode_json("pipes", pipe_map, f); | |
520 | } | |
521 | ||
522 | bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id& source_zone, | |
523 | std::optional<rgw_bucket> source_bucket, | |
524 | const rgw_zone_id& dest_zone, | |
525 | std::optional<rgw_bucket> dest_bucket, | |
526 | bool check_activated) const | |
527 | { | |
528 | bool found = false; | |
529 | bool found_activated = false; | |
530 | ||
531 | for (auto m : flow_groups) { | |
532 | auto& fm = m.second; | |
533 | auto pipes = fm.find_pipes(source_zone, source_bucket, | |
534 | dest_zone, dest_bucket); | |
535 | ||
536 | bool is_found = !pipes.empty(); | |
537 | ||
538 | if (is_found) { | |
539 | switch (fm.status) { | |
540 | case rgw_sync_policy_group::Status::FORBIDDEN: | |
541 | return false; | |
542 | case rgw_sync_policy_group::Status::ENABLED: | |
543 | found = true; | |
544 | found_activated = true; | |
545 | break; | |
546 | case rgw_sync_policy_group::Status::ALLOWED: | |
547 | found = true; | |
548 | break; | |
549 | default: | |
550 | break; /* unknown -- ignore */ | |
551 | } | |
552 | } | |
553 | } | |
554 | ||
555 | if (check_activated && found_activated) { | |
556 | return true; | |
557 | } | |
558 | ||
559 | return found; | |
560 | } | |
561 | ||
20effc67 | 562 | void RGWBucketSyncFlowManager::init(const DoutPrefixProvider *dpp, const rgw_sync_policy_info& sync_policy) { |
9f95a23c TL |
563 | std::optional<rgw_sync_data_flow_group> default_flow; |
564 | if (parent) { | |
565 | default_flow.emplace(); | |
566 | default_flow->init_default(parent->all_zones); | |
567 | } | |
568 | ||
569 | for (auto& item : sync_policy.groups) { | |
570 | auto& group = item.second; | |
571 | auto& flow_group_map = flow_groups[group.id]; | |
572 | ||
20effc67 | 573 | flow_group_map.init(dpp, cct, zone_id, bucket, group, |
9f95a23c TL |
574 | (default_flow ? &(*default_flow) : nullptr), |
575 | &all_zones, | |
576 | [&](const rgw_zone_id& source_zone, | |
577 | std::optional<rgw_bucket> source_bucket, | |
578 | const rgw_zone_id& dest_zone, | |
579 | std::optional<rgw_bucket> dest_bucket) { | |
580 | if (!parent) { | |
581 | return true; | |
582 | } | |
583 | return parent->allowed_data_flow(source_zone, | |
584 | source_bucket, | |
585 | dest_zone, | |
586 | dest_bucket, | |
587 | false); /* just check that it's not disabled */ | |
588 | }); | |
589 | } | |
590 | } | |
591 | ||
1e59de90 TL |
592 | /* |
593 | * These are the semantics to be followed while resolving the policy | |
594 | * conflicts - | |
595 | * | |
596 | * ================================================== | |
597 | * zonegroup bucket Result | |
598 | * ================================================== | |
599 | * enabled enabled enabled | |
600 | * allowed enabled | |
601 | * forbidden disabled | |
602 | * allowed enabled enabled | |
603 | * allowed disabled | |
604 | * forbidden disabled | |
605 | * forbidden enabled disabled | |
606 | * allowed disabled | |
607 | * forbidden disabled | |
608 | * | |
609 | * In case multiple group policies are set to reflect for any sync pair | |
610 | * (<source-zone,source-bucket>, <dest-zone,dest-bucket>), the following | |
611 | * rules are applied in the order- | |
612 | * 1) Even if one policy status is FORBIDDEN, the sync will be disabled | |
613 | * 2) Atleast one policy should be ENABLED for the sync to be allowed. | |
614 | * | |
615 | */ | |
20effc67 TL |
616 | void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp, |
617 | std::optional<rgw_bucket> effective_bucket, | |
9f95a23c TL |
618 | RGWBucketSyncFlowManager::pipe_set *source_pipes, |
619 | RGWBucketSyncFlowManager::pipe_set *dest_pipes, | |
620 | bool only_enabled) const | |
621 | ||
622 | { | |
623 | string effective_bucket_key; | |
1e59de90 | 624 | bool is_forbidden = false; |
9f95a23c TL |
625 | if (effective_bucket) { |
626 | effective_bucket_key = effective_bucket->get_key(); | |
627 | } | |
628 | if (parent) { | |
20effc67 | 629 | parent->reflect(dpp, effective_bucket, source_pipes, dest_pipes, only_enabled); |
9f95a23c TL |
630 | } |
631 | ||
632 | for (auto& item : flow_groups) { | |
633 | auto& flow_group_map = item.second; | |
1e59de90 TL |
634 | is_forbidden = false; |
635 | ||
636 | if (flow_group_map.status == rgw_sync_policy_group::Status::FORBIDDEN) { | |
637 | /* FORBIDDEN takes precedence over all the other rules. | |
638 | * Remove any other pipes which may allow access. | |
639 | */ | |
640 | is_forbidden = true; | |
641 | } else if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED && | |
9f95a23c | 642 | (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) { |
1e59de90 | 643 | /* only return enabled groups */ |
9f95a23c TL |
644 | continue; |
645 | } | |
646 | ||
647 | for (auto& entry : flow_group_map.sources) { | |
648 | rgw_sync_bucket_pipe pipe = entry.second; | |
649 | if (!pipe.dest.match_bucket(effective_bucket)) { | |
650 | continue; | |
651 | } | |
652 | ||
653 | pipe.source.apply_bucket(effective_bucket); | |
654 | pipe.dest.apply_bucket(effective_bucket); | |
655 | ||
1e59de90 TL |
656 | if (is_forbidden) { |
657 | ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing source pipe: " << pipe << dendl; | |
658 | source_pipes->disable(pipe); | |
659 | } else { | |
660 | ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl; | |
661 | source_pipes->insert(pipe); | |
662 | } | |
9f95a23c TL |
663 | } |
664 | ||
665 | for (auto& entry : flow_group_map.dests) { | |
666 | rgw_sync_bucket_pipe pipe = entry.second; | |
667 | ||
668 | if (!pipe.source.match_bucket(effective_bucket)) { | |
669 | continue; | |
670 | } | |
671 | ||
672 | pipe.source.apply_bucket(effective_bucket); | |
673 | pipe.dest.apply_bucket(effective_bucket); | |
674 | ||
1e59de90 TL |
675 | if (is_forbidden) { |
676 | ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing dest pipe: " << pipe << dendl; | |
677 | dest_pipes->disable(pipe); | |
678 | } else { | |
679 | ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl; | |
680 | dest_pipes->insert(pipe); | |
681 | } | |
9f95a23c TL |
682 | } |
683 | } | |
684 | } | |
685 | ||
686 | ||
687 | RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext *_cct, | |
688 | const rgw_zone_id& _zone_id, | |
689 | std::optional<rgw_bucket> _bucket, | |
690 | const RGWBucketSyncFlowManager *_parent) : cct(_cct), | |
691 | zone_id(_zone_id), | |
692 | bucket(_bucket), | |
693 | parent(_parent) {} | |
694 | ||
695 | ||
696 | void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc, | |
697 | RGWSI_SyncModules *sync_modules_svc, | |
698 | rgw_sync_policy_info *ppolicy) | |
699 | { | |
700 | bool found = false; | |
701 | ||
702 | rgw_sync_policy_info policy; | |
703 | ||
704 | auto& group = policy.groups["default"]; | |
705 | auto& zonegroup = zone_svc->get_zonegroup(); | |
706 | ||
707 | for (const auto& ziter1 : zonegroup.zones) { | |
708 | auto& id1 = ziter1.first; | |
709 | const RGWZone& z1 = ziter1.second; | |
710 | ||
711 | for (const auto& ziter2 : zonegroup.zones) { | |
712 | auto& id2 = ziter2.first; | |
713 | const RGWZone& z2 = ziter2.second; | |
714 | ||
715 | if (id1 == id2) { | |
716 | continue; | |
717 | } | |
718 | ||
719 | if (z1.syncs_from(z2.name)) { | |
720 | found = true; | |
721 | rgw_sync_directional_rule *rule; | |
722 | group.data_flow.find_or_create_directional(id2, | |
723 | id1, | |
724 | &rule); | |
725 | } | |
726 | } | |
727 | } | |
728 | ||
729 | if (!found) { /* nothing syncs */ | |
730 | return; | |
731 | } | |
732 | ||
733 | rgw_sync_bucket_pipes pipes; | |
734 | pipes.id = "all"; | |
735 | pipes.source.all_zones = true; | |
736 | pipes.dest.all_zones = true; | |
737 | ||
738 | group.pipes.emplace_back(std::move(pipes)); | |
739 | ||
740 | ||
741 | group.status = rgw_sync_policy_group::Status::ENABLED; | |
742 | ||
743 | *ppolicy = std::move(policy); | |
744 | } | |
745 | ||
746 | RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, | |
747 | RGWSI_SyncModules *sync_modules_svc, | |
748 | RGWSI_Bucket_Sync *_bucket_sync_svc, | |
749 | std::optional<rgw_zone_id> effective_zone) : zone_svc(_zone_svc) , | |
750 | bucket_sync_svc(_bucket_sync_svc) { | |
751 | zone_id = effective_zone.value_or(zone_svc->zone_id()); | |
752 | flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), | |
753 | zone_id, | |
754 | nullopt, | |
755 | nullptr)); | |
756 | sync_policy = zone_svc->get_zonegroup().sync_policy; | |
757 | ||
758 | if (sync_policy.empty()) { | |
759 | RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy); | |
760 | legacy_config = true; | |
761 | } | |
762 | } | |
763 | ||
764 | RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, | |
765 | const RGWBucketInfo& _bucket_info, | |
766 | map<string, bufferlist>&& _bucket_attrs) : parent(_parent), | |
767 | bucket_info(_bucket_info), | |
768 | bucket_attrs(std::move(_bucket_attrs)) { | |
769 | if (_bucket_info.sync_policy) { | |
770 | sync_policy = *_bucket_info.sync_policy; | |
771 | ||
772 | for (auto& entry : sync_policy.groups) { | |
773 | for (auto& pipe : entry.second.pipes) { | |
774 | if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER && | |
775 | pipe.params.user.empty()) { | |
776 | pipe.params.user = _bucket_info.owner; | |
777 | } | |
778 | } | |
779 | } | |
780 | } | |
781 | legacy_config = parent->legacy_config; | |
782 | bucket = _bucket_info.bucket; | |
783 | zone_svc = parent->zone_svc; | |
784 | bucket_sync_svc = parent->bucket_sync_svc; | |
785 | flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), | |
786 | parent->zone_id, | |
787 | _bucket_info.bucket, | |
788 | parent->flow_mgr.get())); | |
789 | } | |
790 | ||
791 | RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, | |
792 | const rgw_bucket& _bucket, | |
793 | std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) { | |
794 | if (_sync_policy) { | |
795 | sync_policy = *_sync_policy; | |
796 | } | |
797 | legacy_config = parent->legacy_config; | |
798 | bucket = _bucket; | |
799 | zone_svc = parent->zone_svc; | |
800 | bucket_sync_svc = parent->bucket_sync_svc; | |
801 | flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), | |
802 | parent->zone_id, | |
803 | _bucket, | |
804 | parent->flow_mgr.get())); | |
805 | } | |
806 | ||
807 | RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info, | |
808 | map<string, bufferlist>&& bucket_attrs) const | |
809 | { | |
810 | return new RGWBucketSyncPolicyHandler(this, bucket_info, std::move(bucket_attrs)); | |
811 | } | |
812 | ||
813 | RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket, | |
814 | std::optional<rgw_sync_policy_info> sync_policy) const | |
815 | { | |
816 | return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy); | |
817 | } | |
818 | ||
b3b6e05e | 819 | int RGWBucketSyncPolicyHandler::init(const DoutPrefixProvider *dpp, optional_yield y) |
9f95a23c | 820 | { |
b3b6e05e | 821 | int r = bucket_sync_svc->get_bucket_sync_hints(dpp, bucket.value_or(rgw_bucket()), |
9f95a23c TL |
822 | &source_hints, |
823 | &target_hints, | |
824 | y); | |
825 | if (r < 0) { | |
b3b6e05e | 826 | ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket=" |
9f95a23c TL |
827 | << bucket << " returned r=" << r << dendl; |
828 | return r; | |
829 | } | |
830 | ||
20effc67 | 831 | flow_mgr->init(dpp, sync_policy); |
9f95a23c | 832 | |
20effc67 | 833 | reflect(dpp, &source_pipes, |
9f95a23c TL |
834 | &target_pipes, |
835 | &sources, | |
836 | &targets, | |
837 | &source_zones, | |
838 | &target_zones, | |
839 | true); | |
840 | ||
841 | return 0; | |
842 | } | |
843 | ||
20effc67 | 844 | void RGWBucketSyncPolicyHandler::reflect(const DoutPrefixProvider *dpp, RGWBucketSyncFlowManager::pipe_set *psource_pipes, |
9f95a23c TL |
845 | RGWBucketSyncFlowManager::pipe_set *ptarget_pipes, |
846 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources, | |
847 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets, | |
848 | std::set<rgw_zone_id> *psource_zones, | |
849 | std::set<rgw_zone_id> *ptarget_zones, | |
850 | bool only_enabled) const | |
851 | { | |
852 | RGWBucketSyncFlowManager::pipe_set _source_pipes; | |
853 | RGWBucketSyncFlowManager::pipe_set _target_pipes; | |
854 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _sources; | |
855 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _targets; | |
856 | std::set<rgw_zone_id> _source_zones; | |
857 | std::set<rgw_zone_id> _target_zones; | |
858 | ||
20effc67 | 859 | flow_mgr->reflect(dpp, bucket, &_source_pipes, &_target_pipes, only_enabled); |
9f95a23c TL |
860 | |
861 | for (auto& entry : _source_pipes.pipe_map) { | |
862 | auto& pipe = entry.second; | |
863 | if (!pipe.source.zone) { | |
864 | continue; | |
865 | } | |
866 | _source_zones.insert(*pipe.source.zone); | |
867 | _sources[*pipe.source.zone].insert(pipe); | |
868 | } | |
869 | ||
870 | for (auto& entry : _target_pipes.pipe_map) { | |
871 | auto& pipe = entry.second; | |
872 | if (!pipe.dest.zone) { | |
873 | continue; | |
874 | } | |
875 | _target_zones.insert(*pipe.dest.zone); | |
876 | _targets[*pipe.dest.zone].insert(pipe); | |
877 | } | |
878 | ||
879 | if (psource_pipes) { | |
880 | *psource_pipes = std::move(_source_pipes); | |
881 | } | |
882 | if (ptarget_pipes) { | |
883 | *ptarget_pipes = std::move(_target_pipes); | |
884 | } | |
885 | if (psources) { | |
886 | *psources = std::move(_sources); | |
887 | } | |
888 | if (ptargets) { | |
889 | *ptargets = std::move(_targets); | |
890 | } | |
891 | if (psource_zones) { | |
892 | *psource_zones = std::move(_source_zones); | |
893 | } | |
894 | if (ptarget_zones) { | |
895 | *ptarget_zones = std::move(_target_zones); | |
896 | } | |
897 | } | |
898 | ||
899 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const | |
900 | { | |
901 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; | |
902 | ||
903 | for (auto& source_entry : sources) { | |
904 | auto& zone_id = source_entry.first; | |
905 | ||
906 | auto& pipes = source_entry.second.pipe_map; | |
907 | ||
908 | for (auto& entry : pipes) { | |
909 | auto& pipe = entry.second; | |
910 | m.insert(make_pair(zone_id, pipe)); | |
911 | } | |
912 | } | |
913 | ||
914 | for (auto& pipe : resolved_sources) { | |
915 | if (!pipe.source.zone) { | |
916 | continue; | |
917 | } | |
918 | ||
919 | m.insert(make_pair(*pipe.source.zone, pipe)); | |
920 | } | |
921 | ||
922 | return m; | |
923 | } | |
924 | ||
925 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const | |
926 | { | |
927 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; | |
928 | ||
929 | for (auto& dest_entry : targets) { | |
930 | auto& zone_id = dest_entry.first; | |
931 | ||
932 | auto& pipes = dest_entry.second.pipe_map; | |
933 | ||
934 | for (auto& entry : pipes) { | |
935 | auto& pipe = entry.second; | |
936 | m.insert(make_pair(zone_id, pipe)); | |
937 | } | |
938 | } | |
939 | ||
940 | for (auto& pipe : resolved_dests) { | |
941 | if (!pipe.dest.zone) { | |
942 | continue; | |
943 | } | |
944 | ||
945 | m.insert(make_pair(*pipe.dest.zone, pipe)); | |
946 | } | |
947 | ||
948 | return m; | |
949 | } | |
950 | ||
951 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const | |
952 | { | |
953 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; | |
954 | ||
955 | auto iter = targets.find(zone_id); | |
956 | if (iter != targets.end()) { | |
957 | auto& pipes = iter->second.pipe_map; | |
958 | ||
959 | for (auto& entry : pipes) { | |
960 | auto& pipe = entry.second; | |
961 | m.insert(make_pair(zone_id, pipe)); | |
962 | } | |
963 | } | |
964 | ||
965 | for (auto& pipe : resolved_dests) { | |
966 | if (!pipe.dest.zone || | |
967 | *pipe.dest.zone != zone_id) { | |
968 | continue; | |
969 | } | |
970 | ||
971 | m.insert(make_pair(*pipe.dest.zone, pipe)); | |
972 | } | |
973 | ||
974 | return m; | |
975 | } | |
976 | ||
977 | void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets, | |
978 | std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */ | |
979 | for (auto& entry : source_pipes.pipe_map) { | |
980 | auto& source_pipe = entry.second; | |
981 | if (!filter_peer || | |
982 | source_pipe.source.match(*filter_peer)) { | |
983 | _sources->insert(source_pipe); | |
984 | } | |
985 | } | |
986 | ||
987 | for (auto& entry : target_pipes.pipe_map) { | |
988 | auto& target_pipe = entry.second; | |
989 | if (!filter_peer || | |
990 | target_pipe.dest.match(*filter_peer)) { | |
991 | _targets->insert(target_pipe); | |
992 | } | |
993 | } | |
994 | } | |
995 | ||
996 | bool RGWBucketSyncPolicyHandler::bucket_exports_data() const | |
997 | { | |
998 | if (!bucket) { | |
999 | return false; | |
1000 | } | |
1001 | ||
1e59de90 TL |
1002 | if (!zone_svc->sync_module_exports_data()) { |
1003 | return false; | |
1004 | } | |
1005 | ||
9f95a23c TL |
1006 | if (bucket_is_sync_source()) { |
1007 | return true; | |
1008 | } | |
1009 | ||
1010 | return (zone_svc->need_to_log_data() && | |
1011 | bucket_info->datasync_flag_enabled()); | |
1012 | } | |
1013 | ||
1014 | bool RGWBucketSyncPolicyHandler::bucket_imports_data() const | |
1015 | { | |
1016 | return bucket_is_sync_target(); | |
1017 | } | |
1018 |