]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | |
2 | ||
3 | #include "rgw_common.h" | |
4 | #include "rgw_bucket_sync.h" | |
5 | #include "rgw_data_sync.h" | |
6 | #include "rgw_zone.h" | |
7 | ||
8 | #include "services/svc_zone.h" | |
9 | #include "services/svc_bucket_sync.h" | |
10 | ||
11 | #define dout_subsys ceph_subsys_rgw | |
12 | ||
13 | ||
14 | ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { | |
15 | os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or(rgw_zone_id()) << ",az=" << (int)e.all_zones << "}"; | |
16 | return os; | |
17 | } | |
18 | ||
19 | ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { | |
20 | os << "{s=" << pipe.source << ",d=" << pipe.dest << "}"; | |
21 | return os; | |
22 | } | |
23 | ||
24 | ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) { | |
25 | os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<rgw_zone_id>()) << "}"; | |
26 | return os; | |
27 | } | |
28 | ||
29 | ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) { | |
30 | os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}"; | |
31 | return os; | |
32 | } | |
33 | ||
34 | static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes, | |
35 | const rgw_zone_id& source_zone, | |
36 | const rgw_zone_id& dest_zone) | |
37 | { | |
38 | std::vector<rgw_sync_bucket_pipe> relevant_pipes; | |
39 | for (auto& p : pipes) { | |
40 | if (p.source.match_zone(source_zone) && | |
41 | p.dest.match_zone(dest_zone)) { | |
42 | for (auto pipe : p.expand()) { | |
43 | pipe.source.apply_zone(source_zone); | |
44 | pipe.dest.apply_zone(dest_zone); | |
45 | relevant_pipes.push_back(pipe); | |
46 | } | |
47 | } | |
48 | } | |
49 | ||
50 | return relevant_pipes; | |
51 | } | |
52 | ||
53 | static bool is_wildcard_bucket(const rgw_bucket& bucket) | |
54 | { | |
55 | return bucket.name.empty(); | |
56 | } | |
57 | ||
58 | void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const | |
59 | { | |
60 | encode_json("zone", zone.id, f); | |
61 | encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); | |
62 | encode_json("sources", sources, f); | |
63 | encode_json("dests", dests, f); | |
64 | } | |
65 | ||
66 | ||
67 | template <typename CB1, typename CB2> | |
68 | void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id& source_zone, | |
69 | const rgw_zone_id& dest_zone, | |
70 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
71 | zb_pipe_map_t *pipe_map, | |
72 | CB1 filter_cb, | |
73 | CB2 call_filter_cb) | |
74 | { | |
75 | if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) { | |
76 | return; | |
77 | } | |
78 | auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone); | |
79 | ||
80 | for (auto& pipe : relevant_pipes) { | |
81 | rgw_sync_bucket_entity zb; | |
82 | if (!call_filter_cb(pipe, &zb)) { | |
83 | continue; | |
84 | } | |
85 | pipe_map->insert(make_pair(zb, pipe)); | |
86 | } | |
87 | } | |
88 | ||
89 | template <typename CB> | |
90 | void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id& source_zone, | |
91 | const rgw_zone_id& dest_zone, | |
92 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
93 | CB filter_cb) | |
94 | { | |
95 | return try_add_to_pipe_map(source_zone, dest_zone, pipes, | |
96 | &sources, | |
97 | filter_cb, | |
98 | [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { | |
99 | *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()}; | |
100 | return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket()); | |
101 | }); | |
102 | } | |
103 | ||
104 | template <typename CB> | |
105 | void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id& source_zone, | |
106 | const rgw_zone_id& dest_zone, | |
107 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
108 | CB filter_cb) | |
109 | { | |
110 | return try_add_to_pipe_map(source_zone, dest_zone, pipes, | |
111 | &dests, | |
112 | filter_cb, | |
113 | [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { | |
114 | *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()}; | |
115 | return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket); | |
116 | }); | |
117 | } | |
118 | ||
119 | using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t; | |
120 | ||
121 | pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m, | |
122 | const rgw_zone_id& zone, | |
123 | std::optional<rgw_bucket> b) const | |
124 | { | |
125 | if (!b) { | |
126 | return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()}); | |
127 | } | |
128 | ||
129 | auto zb = rgw_sync_bucket_entity{zone, *b}; | |
130 | ||
131 | auto range = m.equal_range(zb); | |
132 | if (range.first == range.second && | |
133 | !is_wildcard_bucket(*b)) { | |
134 | /* couldn't find the specific bucket, try to find by wildcard */ | |
135 | zb.bucket = rgw_bucket(); | |
136 | range = m.equal_range(zb); | |
137 | } | |
138 | ||
139 | return range; | |
140 | } | |
141 | ||
142 | ||
143 | template <typename CB> | |
144 | void rgw_sync_group_pipe_map::init(CephContext *cct, | |
145 | const rgw_zone_id& _zone, | |
146 | std::optional<rgw_bucket> _bucket, | |
147 | const rgw_sync_policy_group& group, | |
148 | rgw_sync_data_flow_group *_default_flow, | |
149 | std::set<rgw_zone_id> *_pall_zones, | |
150 | CB filter_cb) { | |
151 | zone = _zone; | |
152 | bucket = _bucket; | |
153 | default_flow = _default_flow; | |
154 | pall_zones = _pall_zones; | |
155 | ||
156 | rgw_sync_bucket_entity zb(zone, bucket); | |
157 | ||
158 | status = group.status; | |
159 | ||
160 | std::vector<rgw_sync_bucket_pipes> zone_pipes; | |
161 | ||
162 | string bucket_key = (bucket ? bucket->get_key() : "*"); | |
163 | ||
164 | /* only look at pipes that touch the specific zone and bucket */ | |
165 | for (auto& pipe : group.pipes) { | |
166 | if (pipe.contains_zone_bucket(zone, bucket)) { | |
167 | ldout(cct, 20) << __func__ << "(): pipe_map (zone=" << zone << " bucket=" << bucket_key << "): adding potential pipe: " << pipe << dendl; | |
168 | zone_pipes.push_back(pipe); | |
169 | } | |
170 | } | |
171 | ||
172 | const rgw_sync_data_flow_group *pflow; | |
173 | ||
174 | if (!group.data_flow.empty()) { | |
175 | pflow = &group.data_flow; | |
176 | } else { | |
177 | if (!default_flow) { | |
178 | return; | |
179 | } | |
180 | pflow = default_flow; | |
181 | } | |
182 | ||
183 | auto& flow = *pflow; | |
184 | ||
185 | pall_zones->insert(zone); | |
186 | ||
187 | /* symmetrical */ | |
188 | for (auto& symmetrical_group : flow.symmetrical) { | |
189 | if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) { | |
190 | for (auto& z : symmetrical_group.zones) { | |
191 | if (z != zone) { | |
192 | pall_zones->insert(z); | |
193 | try_add_source(z, zone, zone_pipes, filter_cb); | |
194 | try_add_dest(zone, z, zone_pipes, filter_cb); | |
195 | } | |
196 | } | |
197 | } | |
198 | } | |
199 | ||
200 | /* directional */ | |
201 | for (auto& rule : flow.directional) { | |
202 | if (rule.source_zone == zone) { | |
203 | pall_zones->insert(rule.dest_zone); | |
204 | try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb); | |
205 | } else if (rule.dest_zone == zone) { | |
206 | pall_zones->insert(rule.source_zone); | |
207 | try_add_source(rule.source_zone, zone, zone_pipes, filter_cb); | |
208 | } | |
209 | } | |
210 | } | |
211 | ||
212 | /* | |
213 | * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} | |
214 | */ | |
215 | vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id& source_zone, | |
216 | std::optional<rgw_bucket> source_bucket, | |
217 | std::optional<rgw_bucket> dest_bucket) const { | |
218 | vector<rgw_sync_bucket_pipe> result; | |
219 | ||
220 | auto range = find_pipes(sources, source_zone, source_bucket); | |
221 | ||
222 | for (auto iter = range.first; iter != range.second; ++iter) { | |
223 | auto pipe = iter->second; | |
224 | if (pipe.dest.match_bucket(dest_bucket)) { | |
225 | result.push_back(pipe); | |
226 | } | |
227 | } | |
228 | return result; | |
229 | } | |
230 | ||
231 | /* | |
232 | * find all relevant pipes in other zones that pull from a specific | |
233 | * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} | |
234 | */ | |
235 | vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket, | |
236 | const rgw_zone_id& dest_zone, | |
237 | std::optional<rgw_bucket> dest_bucket) const { | |
238 | vector<rgw_sync_bucket_pipe> result; | |
239 | ||
240 | auto range = find_pipes(dests, dest_zone, dest_bucket); | |
241 | ||
242 | for (auto iter = range.first; iter != range.second; ++iter) { | |
243 | auto pipe = iter->second; | |
244 | if (pipe.source.match_bucket(source_bucket)) { | |
245 | result.push_back(pipe); | |
246 | } | |
247 | } | |
248 | ||
249 | return result; | |
250 | } | |
251 | ||
252 | /* | |
253 | * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} | |
254 | */ | |
255 | vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id& source_zone, | |
256 | std::optional<rgw_bucket> source_bucket, | |
257 | const rgw_zone_id& dest_zone, | |
258 | std::optional<rgw_bucket> dest_bucket) const { | |
259 | if (dest_zone == zone) { | |
260 | return find_source_pipes(source_zone, source_bucket, dest_bucket); | |
261 | } | |
262 | ||
263 | if (source_zone == zone) { | |
264 | return find_dest_pipes(source_bucket, dest_zone, dest_bucket); | |
265 | } | |
266 | ||
267 | return vector<rgw_sync_bucket_pipe>(); | |
268 | } | |
269 | ||
270 | void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe) | |
271 | { | |
272 | pipes.push_back(pipe); | |
273 | ||
274 | auto ppipe = &pipes.back(); | |
275 | auto prefix = ppipe->params.source.filter.prefix.value_or(string()); | |
276 | ||
277 | prefix_refs.insert(make_pair(prefix, ppipe)); | |
278 | ||
279 | for (auto& t : ppipe->params.source.filter.tags) { | |
280 | string tag = t.key + "=" + t.value; | |
281 | auto titer = tag_refs.find(tag); | |
282 | if (titer != tag_refs.end() && | |
283 | ppipe->params.priority > titer->second->params.priority) { | |
284 | titer->second = ppipe; | |
285 | } else { | |
286 | tag_refs[tag] = ppipe; | |
287 | } | |
288 | } | |
289 | } | |
290 | ||
291 | bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key, | |
292 | std::optional<rgw_user> *user, | |
293 | std::optional<rgw_user> *acl_translation_owner, | |
294 | std::optional<string> *storage_class, | |
295 | rgw_sync_pipe_params::Mode *mode, | |
296 | bool *need_more_info) const | |
297 | { | |
298 | std::optional<string> owner; | |
299 | ||
300 | *need_more_info = false; | |
301 | ||
302 | if (prefix_refs.empty()) { | |
303 | return false; | |
304 | } | |
305 | ||
306 | auto end = prefix_refs.upper_bound(key.name); | |
307 | auto iter = end; | |
308 | if (iter != prefix_refs.begin()) { | |
309 | --iter; | |
310 | } | |
311 | if (iter == prefix_refs.end()) { | |
312 | return false; | |
313 | } | |
314 | ||
315 | if (iter != prefix_refs.begin()) { | |
316 | iter = prefix_refs.find(iter->first); /* prefix_refs is multimap, find first element | |
317 | holding that key */ | |
318 | } | |
319 | ||
320 | std::vector<decltype(iter)> iters; | |
321 | ||
322 | std::optional<int> priority; | |
323 | ||
324 | for (; iter != end; ++iter) { | |
325 | auto& prefix = iter->first; | |
326 | if (!boost::starts_with(key.name, prefix)) { | |
327 | continue; | |
328 | } | |
329 | ||
330 | auto& rule_params = iter->second->params; | |
331 | auto& filter = rule_params.source.filter; | |
332 | ||
333 | if (rule_params.priority > priority) { | |
334 | priority = rule_params.priority; | |
335 | ||
336 | if (!filter.has_tags()) { | |
337 | iters.clear(); | |
338 | } | |
339 | iters.push_back(iter); | |
340 | ||
341 | *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then | |
342 | we can't be sure if it would be used. | |
343 | We need to first read the info from the source object */ | |
344 | } | |
345 | } | |
346 | ||
347 | if (iters.empty()) { | |
348 | return false; | |
349 | } | |
350 | ||
351 | bool conflict = false; | |
352 | ||
353 | std::optional<rgw_user> _user; | |
354 | std::optional<rgw_sync_pipe_acl_translation> _acl_translation; | |
355 | std::optional<string> _storage_class; | |
356 | rgw_sync_pipe_params::Mode _mode; | |
357 | ||
358 | int i = 0; | |
359 | for (auto& iter : iters) { | |
360 | auto& rule_params = iter->second->params; | |
361 | if (++i == 0) { | |
362 | _user = rule_params.user; | |
363 | _acl_translation = rule_params.dest.acl_translation; | |
364 | _storage_class = rule_params.dest.storage_class; | |
365 | _mode = rule_params.mode; | |
366 | continue; | |
367 | } | |
368 | ||
369 | conflict = !(_user == rule_params.user && | |
370 | _acl_translation == rule_params.dest.acl_translation && | |
371 | _storage_class == rule_params.dest.storage_class && | |
372 | _mode == rule_params.mode); | |
373 | if (conflict) { | |
374 | *need_more_info = true; | |
375 | return false; | |
376 | } | |
377 | } | |
378 | ||
379 | *user = _user; | |
380 | if (_acl_translation) { | |
381 | *acl_translation_owner = _acl_translation->owner; | |
382 | } | |
383 | *storage_class = _storage_class; | |
384 | *mode = _mode; | |
385 | ||
386 | return true; | |
387 | } | |
388 | ||
389 | bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key, | |
390 | const RGWObjTags::tag_map_t& tags, | |
391 | rgw_sync_pipe_params *params) const | |
392 | { | |
393 | if (prefix_refs.empty()) { | |
394 | return false; | |
395 | } | |
396 | ||
397 | auto iter = prefix_refs.upper_bound(key.name); | |
398 | if (iter != prefix_refs.begin()) { | |
399 | --iter; | |
400 | } | |
401 | if (iter == prefix_refs.end()) { | |
402 | return false; | |
403 | } | |
404 | ||
405 | auto end = prefix_refs.upper_bound(key.name); | |
406 | auto max = end; | |
407 | ||
408 | std::optional<int> priority; | |
409 | ||
410 | for (; iter != end; ++iter) { | |
411 | /* NOTE: this is not the most efficient way to do it, | |
412 | * a trie data structure would be better | |
413 | */ | |
414 | auto& prefix = iter->first; | |
415 | if (!boost::starts_with(key.name, prefix)) { | |
416 | continue; | |
417 | } | |
418 | ||
419 | auto& rule_params = iter->second->params; | |
420 | auto& filter = rule_params.source.filter; | |
421 | ||
422 | if (!filter.check_tags(tags)) { | |
423 | continue; | |
424 | } | |
425 | ||
426 | if (rule_params.priority > priority) { | |
427 | priority = rule_params.priority; | |
428 | max = iter; | |
429 | } | |
430 | } | |
431 | ||
432 | if (max == end) { | |
433 | return false; | |
434 | } | |
435 | ||
436 | *params = max->second->params; | |
437 | return true; | |
438 | } | |
439 | ||
440 | /* | |
441 | * return either the current prefix for s, or the next one if s is not within a prefix | |
442 | */ | |
443 | ||
444 | RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const | |
445 | { | |
446 | if (prefix_refs.empty()) { | |
447 | return prefix_refs.end(); | |
448 | } | |
449 | auto next = prefix_refs.upper_bound(s); | |
450 | auto iter = next; | |
451 | if (iter != prefix_refs.begin()) { | |
452 | --iter; | |
453 | } | |
454 | if (!boost::starts_with(s, iter->first)) { | |
455 | return next; | |
456 | } | |
457 | ||
458 | return iter; | |
459 | } | |
460 | ||
461 | void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) { | |
462 | pipe_map.insert(make_pair(pipe.id, pipe)); | |
463 | ||
464 | auto& rules_ref = rules[endpoints_pair(pipe)]; | |
465 | ||
466 | if (!rules_ref) { | |
467 | rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>(); | |
468 | } | |
469 | ||
470 | rules_ref->insert(pipe); | |
471 | ||
472 | pipe_handler h(rules_ref, pipe); | |
473 | ||
474 | handlers.insert(h); | |
475 | } | |
476 | ||
477 | void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const | |
478 | { | |
479 | encode_json("pipes", pipe_map, f); | |
480 | } | |
481 | ||
482 | bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id& source_zone, | |
483 | std::optional<rgw_bucket> source_bucket, | |
484 | const rgw_zone_id& dest_zone, | |
485 | std::optional<rgw_bucket> dest_bucket, | |
486 | bool check_activated) const | |
487 | { | |
488 | bool found = false; | |
489 | bool found_activated = false; | |
490 | ||
491 | for (auto m : flow_groups) { | |
492 | auto& fm = m.second; | |
493 | auto pipes = fm.find_pipes(source_zone, source_bucket, | |
494 | dest_zone, dest_bucket); | |
495 | ||
496 | bool is_found = !pipes.empty(); | |
497 | ||
498 | if (is_found) { | |
499 | switch (fm.status) { | |
500 | case rgw_sync_policy_group::Status::FORBIDDEN: | |
501 | return false; | |
502 | case rgw_sync_policy_group::Status::ENABLED: | |
503 | found = true; | |
504 | found_activated = true; | |
505 | break; | |
506 | case rgw_sync_policy_group::Status::ALLOWED: | |
507 | found = true; | |
508 | break; | |
509 | default: | |
510 | break; /* unknown -- ignore */ | |
511 | } | |
512 | } | |
513 | } | |
514 | ||
515 | if (check_activated && found_activated) { | |
516 | return true; | |
517 | } | |
518 | ||
519 | return found; | |
520 | } | |
521 | ||
522 | void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { | |
523 | std::optional<rgw_sync_data_flow_group> default_flow; | |
524 | if (parent) { | |
525 | default_flow.emplace(); | |
526 | default_flow->init_default(parent->all_zones); | |
527 | } | |
528 | ||
529 | for (auto& item : sync_policy.groups) { | |
530 | auto& group = item.second; | |
531 | auto& flow_group_map = flow_groups[group.id]; | |
532 | ||
533 | flow_group_map.init(cct, zone_id, bucket, group, | |
534 | (default_flow ? &(*default_flow) : nullptr), | |
535 | &all_zones, | |
536 | [&](const rgw_zone_id& source_zone, | |
537 | std::optional<rgw_bucket> source_bucket, | |
538 | const rgw_zone_id& dest_zone, | |
539 | std::optional<rgw_bucket> dest_bucket) { | |
540 | if (!parent) { | |
541 | return true; | |
542 | } | |
543 | return parent->allowed_data_flow(source_zone, | |
544 | source_bucket, | |
545 | dest_zone, | |
546 | dest_bucket, | |
547 | false); /* just check that it's not disabled */ | |
548 | }); | |
549 | } | |
550 | } | |
551 | ||
552 | void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket, | |
553 | RGWBucketSyncFlowManager::pipe_set *source_pipes, | |
554 | RGWBucketSyncFlowManager::pipe_set *dest_pipes, | |
555 | bool only_enabled) const | |
556 | ||
557 | { | |
558 | string effective_bucket_key; | |
559 | if (effective_bucket) { | |
560 | effective_bucket_key = effective_bucket->get_key(); | |
561 | } | |
562 | if (parent) { | |
563 | parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled); | |
564 | } | |
565 | ||
566 | for (auto& item : flow_groups) { | |
567 | auto& flow_group_map = item.second; | |
568 | ||
569 | /* only return enabled groups */ | |
570 | if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED && | |
571 | (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) { | |
572 | continue; | |
573 | } | |
574 | ||
575 | for (auto& entry : flow_group_map.sources) { | |
576 | rgw_sync_bucket_pipe pipe = entry.second; | |
577 | if (!pipe.dest.match_bucket(effective_bucket)) { | |
578 | continue; | |
579 | } | |
580 | ||
581 | pipe.source.apply_bucket(effective_bucket); | |
582 | pipe.dest.apply_bucket(effective_bucket); | |
583 | ||
584 | ldout(cct, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl; | |
585 | source_pipes->insert(pipe); | |
586 | } | |
587 | ||
588 | for (auto& entry : flow_group_map.dests) { | |
589 | rgw_sync_bucket_pipe pipe = entry.second; | |
590 | ||
591 | if (!pipe.source.match_bucket(effective_bucket)) { | |
592 | continue; | |
593 | } | |
594 | ||
595 | pipe.source.apply_bucket(effective_bucket); | |
596 | pipe.dest.apply_bucket(effective_bucket); | |
597 | ||
598 | ldout(cct, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl; | |
599 | dest_pipes->insert(pipe); | |
600 | } | |
601 | } | |
602 | } | |
603 | ||
604 | ||
605 | RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext *_cct, | |
606 | const rgw_zone_id& _zone_id, | |
607 | std::optional<rgw_bucket> _bucket, | |
608 | const RGWBucketSyncFlowManager *_parent) : cct(_cct), | |
609 | zone_id(_zone_id), | |
610 | bucket(_bucket), | |
611 | parent(_parent) {} | |
612 | ||
613 | ||
614 | void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc, | |
615 | RGWSI_SyncModules *sync_modules_svc, | |
616 | rgw_sync_policy_info *ppolicy) | |
617 | { | |
618 | bool found = false; | |
619 | ||
620 | rgw_sync_policy_info policy; | |
621 | ||
622 | auto& group = policy.groups["default"]; | |
623 | auto& zonegroup = zone_svc->get_zonegroup(); | |
624 | ||
625 | for (const auto& ziter1 : zonegroup.zones) { | |
626 | auto& id1 = ziter1.first; | |
627 | const RGWZone& z1 = ziter1.second; | |
628 | ||
629 | for (const auto& ziter2 : zonegroup.zones) { | |
630 | auto& id2 = ziter2.first; | |
631 | const RGWZone& z2 = ziter2.second; | |
632 | ||
633 | if (id1 == id2) { | |
634 | continue; | |
635 | } | |
636 | ||
637 | if (z1.syncs_from(z2.name)) { | |
638 | found = true; | |
639 | rgw_sync_directional_rule *rule; | |
640 | group.data_flow.find_or_create_directional(id2, | |
641 | id1, | |
642 | &rule); | |
643 | } | |
644 | } | |
645 | } | |
646 | ||
647 | if (!found) { /* nothing syncs */ | |
648 | return; | |
649 | } | |
650 | ||
651 | rgw_sync_bucket_pipes pipes; | |
652 | pipes.id = "all"; | |
653 | pipes.source.all_zones = true; | |
654 | pipes.dest.all_zones = true; | |
655 | ||
656 | group.pipes.emplace_back(std::move(pipes)); | |
657 | ||
658 | ||
659 | group.status = rgw_sync_policy_group::Status::ENABLED; | |
660 | ||
661 | *ppolicy = std::move(policy); | |
662 | } | |
663 | ||
664 | RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, | |
665 | RGWSI_SyncModules *sync_modules_svc, | |
666 | RGWSI_Bucket_Sync *_bucket_sync_svc, | |
667 | std::optional<rgw_zone_id> effective_zone) : zone_svc(_zone_svc) , | |
668 | bucket_sync_svc(_bucket_sync_svc) { | |
669 | zone_id = effective_zone.value_or(zone_svc->zone_id()); | |
670 | flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), | |
671 | zone_id, | |
672 | nullopt, | |
673 | nullptr)); | |
674 | sync_policy = zone_svc->get_zonegroup().sync_policy; | |
675 | ||
676 | if (sync_policy.empty()) { | |
677 | RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy); | |
678 | legacy_config = true; | |
679 | } | |
680 | } | |
681 | ||
682 | RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, | |
683 | const RGWBucketInfo& _bucket_info, | |
684 | map<string, bufferlist>&& _bucket_attrs) : parent(_parent), | |
685 | bucket_info(_bucket_info), | |
686 | bucket_attrs(std::move(_bucket_attrs)) { | |
687 | if (_bucket_info.sync_policy) { | |
688 | sync_policy = *_bucket_info.sync_policy; | |
689 | ||
690 | for (auto& entry : sync_policy.groups) { | |
691 | for (auto& pipe : entry.second.pipes) { | |
692 | if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER && | |
693 | pipe.params.user.empty()) { | |
694 | pipe.params.user = _bucket_info.owner; | |
695 | } | |
696 | } | |
697 | } | |
698 | } | |
699 | legacy_config = parent->legacy_config; | |
700 | bucket = _bucket_info.bucket; | |
701 | zone_svc = parent->zone_svc; | |
702 | bucket_sync_svc = parent->bucket_sync_svc; | |
703 | flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), | |
704 | parent->zone_id, | |
705 | _bucket_info.bucket, | |
706 | parent->flow_mgr.get())); | |
707 | } | |
708 | ||
709 | RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, | |
710 | const rgw_bucket& _bucket, | |
711 | std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) { | |
712 | if (_sync_policy) { | |
713 | sync_policy = *_sync_policy; | |
714 | } | |
715 | legacy_config = parent->legacy_config; | |
716 | bucket = _bucket; | |
717 | zone_svc = parent->zone_svc; | |
718 | bucket_sync_svc = parent->bucket_sync_svc; | |
719 | flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), | |
720 | parent->zone_id, | |
721 | _bucket, | |
722 | parent->flow_mgr.get())); | |
723 | } | |
724 | ||
725 | RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info, | |
726 | map<string, bufferlist>&& bucket_attrs) const | |
727 | { | |
728 | return new RGWBucketSyncPolicyHandler(this, bucket_info, std::move(bucket_attrs)); | |
729 | } | |
730 | ||
731 | RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket, | |
732 | std::optional<rgw_sync_policy_info> sync_policy) const | |
733 | { | |
734 | return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy); | |
735 | } | |
736 | ||
737 | int RGWBucketSyncPolicyHandler::init(optional_yield y) | |
738 | { | |
739 | int r = bucket_sync_svc->get_bucket_sync_hints(bucket.value_or(rgw_bucket()), | |
740 | &source_hints, | |
741 | &target_hints, | |
742 | y); | |
743 | if (r < 0) { | |
744 | ldout(bucket_sync_svc->ctx(), 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket=" | |
745 | << bucket << " returned r=" << r << dendl; | |
746 | return r; | |
747 | } | |
748 | ||
749 | flow_mgr->init(sync_policy); | |
750 | ||
751 | reflect(&source_pipes, | |
752 | &target_pipes, | |
753 | &sources, | |
754 | &targets, | |
755 | &source_zones, | |
756 | &target_zones, | |
757 | true); | |
758 | ||
759 | return 0; | |
760 | } | |
761 | ||
762 | void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psource_pipes, | |
763 | RGWBucketSyncFlowManager::pipe_set *ptarget_pipes, | |
764 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources, | |
765 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets, | |
766 | std::set<rgw_zone_id> *psource_zones, | |
767 | std::set<rgw_zone_id> *ptarget_zones, | |
768 | bool only_enabled) const | |
769 | { | |
770 | RGWBucketSyncFlowManager::pipe_set _source_pipes; | |
771 | RGWBucketSyncFlowManager::pipe_set _target_pipes; | |
772 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _sources; | |
773 | map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _targets; | |
774 | std::set<rgw_zone_id> _source_zones; | |
775 | std::set<rgw_zone_id> _target_zones; | |
776 | ||
777 | flow_mgr->reflect(bucket, &_source_pipes, &_target_pipes, only_enabled); | |
778 | ||
779 | for (auto& entry : _source_pipes.pipe_map) { | |
780 | auto& pipe = entry.second; | |
781 | if (!pipe.source.zone) { | |
782 | continue; | |
783 | } | |
784 | _source_zones.insert(*pipe.source.zone); | |
785 | _sources[*pipe.source.zone].insert(pipe); | |
786 | } | |
787 | ||
788 | for (auto& entry : _target_pipes.pipe_map) { | |
789 | auto& pipe = entry.second; | |
790 | if (!pipe.dest.zone) { | |
791 | continue; | |
792 | } | |
793 | _target_zones.insert(*pipe.dest.zone); | |
794 | _targets[*pipe.dest.zone].insert(pipe); | |
795 | } | |
796 | ||
797 | if (psource_pipes) { | |
798 | *psource_pipes = std::move(_source_pipes); | |
799 | } | |
800 | if (ptarget_pipes) { | |
801 | *ptarget_pipes = std::move(_target_pipes); | |
802 | } | |
803 | if (psources) { | |
804 | *psources = std::move(_sources); | |
805 | } | |
806 | if (ptargets) { | |
807 | *ptargets = std::move(_targets); | |
808 | } | |
809 | if (psource_zones) { | |
810 | *psource_zones = std::move(_source_zones); | |
811 | } | |
812 | if (ptarget_zones) { | |
813 | *ptarget_zones = std::move(_target_zones); | |
814 | } | |
815 | } | |
816 | ||
817 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const | |
818 | { | |
819 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; | |
820 | ||
821 | for (auto& source_entry : sources) { | |
822 | auto& zone_id = source_entry.first; | |
823 | ||
824 | auto& pipes = source_entry.second.pipe_map; | |
825 | ||
826 | for (auto& entry : pipes) { | |
827 | auto& pipe = entry.second; | |
828 | m.insert(make_pair(zone_id, pipe)); | |
829 | } | |
830 | } | |
831 | ||
832 | for (auto& pipe : resolved_sources) { | |
833 | if (!pipe.source.zone) { | |
834 | continue; | |
835 | } | |
836 | ||
837 | m.insert(make_pair(*pipe.source.zone, pipe)); | |
838 | } | |
839 | ||
840 | return m; | |
841 | } | |
842 | ||
843 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const | |
844 | { | |
845 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; | |
846 | ||
847 | for (auto& dest_entry : targets) { | |
848 | auto& zone_id = dest_entry.first; | |
849 | ||
850 | auto& pipes = dest_entry.second.pipe_map; | |
851 | ||
852 | for (auto& entry : pipes) { | |
853 | auto& pipe = entry.second; | |
854 | m.insert(make_pair(zone_id, pipe)); | |
855 | } | |
856 | } | |
857 | ||
858 | for (auto& pipe : resolved_dests) { | |
859 | if (!pipe.dest.zone) { | |
860 | continue; | |
861 | } | |
862 | ||
863 | m.insert(make_pair(*pipe.dest.zone, pipe)); | |
864 | } | |
865 | ||
866 | return m; | |
867 | } | |
868 | ||
869 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const | |
870 | { | |
871 | multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; | |
872 | ||
873 | auto iter = targets.find(zone_id); | |
874 | if (iter != targets.end()) { | |
875 | auto& pipes = iter->second.pipe_map; | |
876 | ||
877 | for (auto& entry : pipes) { | |
878 | auto& pipe = entry.second; | |
879 | m.insert(make_pair(zone_id, pipe)); | |
880 | } | |
881 | } | |
882 | ||
883 | for (auto& pipe : resolved_dests) { | |
884 | if (!pipe.dest.zone || | |
885 | *pipe.dest.zone != zone_id) { | |
886 | continue; | |
887 | } | |
888 | ||
889 | m.insert(make_pair(*pipe.dest.zone, pipe)); | |
890 | } | |
891 | ||
892 | return m; | |
893 | } | |
894 | ||
895 | void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets, | |
896 | std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */ | |
897 | for (auto& entry : source_pipes.pipe_map) { | |
898 | auto& source_pipe = entry.second; | |
899 | if (!filter_peer || | |
900 | source_pipe.source.match(*filter_peer)) { | |
901 | _sources->insert(source_pipe); | |
902 | } | |
903 | } | |
904 | ||
905 | for (auto& entry : target_pipes.pipe_map) { | |
906 | auto& target_pipe = entry.second; | |
907 | if (!filter_peer || | |
908 | target_pipe.dest.match(*filter_peer)) { | |
909 | _targets->insert(target_pipe); | |
910 | } | |
911 | } | |
912 | } | |
913 | ||
914 | bool RGWBucketSyncPolicyHandler::bucket_exports_data() const | |
915 | { | |
916 | if (!bucket) { | |
917 | return false; | |
918 | } | |
919 | ||
920 | if (bucket_is_sync_source()) { | |
921 | return true; | |
922 | } | |
923 | ||
924 | return (zone_svc->need_to_log_data() && | |
925 | bucket_info->datasync_flag_enabled()); | |
926 | } | |
927 | ||
928 | bool RGWBucketSyncPolicyHandler::bucket_imports_data() const | |
929 | { | |
930 | return bucket_is_sync_target(); | |
931 | } | |
932 |