]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | |
2 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
3 | // vim: ts=8 sw=2 smarttab | |
4 | ||
5 | /* | |
6 | * Ceph - scalable distributed file system | |
7 | * | |
8 | * Copyright (C) 2018 Red Hat, Inc. | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #pragma once | |
18 | ||
19 | #include "rgw_common.h" | |
20 | #include "rgw_sync_policy.h" | |
21 | ||
22 | class RGWSI_Zone; | |
23 | class RGWSI_SyncModules; | |
24 | class RGWSI_Bucket_Sync; | |
25 | ||
26 | struct rgw_sync_group_pipe_map; | |
27 | struct rgw_sync_bucket_pipes; | |
28 | struct rgw_sync_policy_info; | |
29 | ||
30 | struct rgw_sync_group_pipe_map { | |
31 | rgw_zone_id zone; | |
32 | std::optional<rgw_bucket> bucket; | |
33 | ||
34 | rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN}; | |
35 | ||
36 | using zb_pipe_map_t = std::multimap<rgw_sync_bucket_entity, rgw_sync_bucket_pipe>; | |
37 | ||
38 | zb_pipe_map_t sources; /* all the pipes where zone is pulling from */ | |
39 | zb_pipe_map_t dests; /* all the pipes that pull from zone */ | |
40 | ||
41 | std::set<rgw_zone_id> *pall_zones{nullptr}; | |
42 | rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it, | |
43 | used in the case of bucket sync policy, not at the | |
44 | zonegroup level */ | |
45 | ||
46 | void dump(ceph::Formatter *f) const; | |
47 | ||
48 | template <typename CB1, typename CB2> | |
49 | void try_add_to_pipe_map(const rgw_zone_id& source_zone, | |
50 | const rgw_zone_id& dest_zone, | |
51 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
52 | zb_pipe_map_t *pipe_map, | |
53 | CB1 filter_cb, | |
54 | CB2 call_filter_cb); | |
55 | ||
56 | template <typename CB> | |
57 | void try_add_source(const rgw_zone_id& source_zone, | |
58 | const rgw_zone_id& dest_zone, | |
59 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
60 | CB filter_cb); | |
61 | ||
62 | template <typename CB> | |
63 | void try_add_dest(const rgw_zone_id& source_zone, | |
64 | const rgw_zone_id& dest_zone, | |
65 | const std::vector<rgw_sync_bucket_pipes>& pipes, | |
66 | CB filter_cb); | |
67 | ||
20effc67 | 68 | std::pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m, |
9f95a23c TL |
69 | const rgw_zone_id& zone, |
70 | std::optional<rgw_bucket> b) const; | |
71 | ||
72 | template <typename CB> | |
20effc67 | 73 | void init(const DoutPrefixProvider *dpp, CephContext *cct, |
9f95a23c TL |
74 | const rgw_zone_id& _zone, |
75 | std::optional<rgw_bucket> _bucket, | |
76 | const rgw_sync_policy_group& group, | |
77 | rgw_sync_data_flow_group *_default_flow, | |
78 | std::set<rgw_zone_id> *_pall_zones, | |
79 | CB filter_cb); | |
80 | ||
81 | /* | |
82 | * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} | |
83 | */ | |
20effc67 | 84 | std::vector<rgw_sync_bucket_pipe> find_source_pipes(const rgw_zone_id& source_zone, |
9f95a23c TL |
85 | std::optional<rgw_bucket> source_bucket, |
86 | std::optional<rgw_bucket> dest_bucket) const; | |
87 | ||
88 | /* | |
89 | * find all relevant pipes in other zones that pull from a specific | |
90 | * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} | |
91 | */ | |
20effc67 | 92 | std::vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket, |
9f95a23c TL |
93 | const rgw_zone_id& dest_zone, |
94 | std::optional<rgw_bucket> dest_bucket) const; | |
95 | ||
96 | /* | |
97 | * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} | |
98 | */ | |
20effc67 | 99 | std::vector<rgw_sync_bucket_pipe> find_pipes(const rgw_zone_id& source_zone, |
9f95a23c TL |
100 | std::optional<rgw_bucket> source_bucket, |
101 | const rgw_zone_id& dest_zone, | |
102 | std::optional<rgw_bucket> dest_bucket) const; | |
103 | }; | |
104 | ||
105 | class RGWSyncPolicyCompat { | |
106 | public: | |
107 | static void convert_old_sync_config(RGWSI_Zone *zone_svc, | |
108 | RGWSI_SyncModules *sync_modules_svc, | |
109 | rgw_sync_policy_info *ppolicy); | |
110 | }; | |
111 | ||
112 | class RGWBucketSyncFlowManager { | |
113 | friend class RGWBucketSyncPolicyHandler; | |
114 | public: | |
115 | struct endpoints_pair { | |
116 | rgw_sync_bucket_entity source; | |
117 | rgw_sync_bucket_entity dest; | |
118 | ||
119 | endpoints_pair() {} | |
120 | endpoints_pair(const rgw_sync_bucket_pipe& pipe) { | |
121 | source = pipe.source; | |
122 | dest = pipe.dest; | |
123 | } | |
124 | ||
125 | bool operator<(const endpoints_pair& e) const { | |
126 | if (source < e.source) { | |
127 | return true; | |
128 | } | |
129 | if (e.source < source) { | |
130 | return false; | |
131 | } | |
132 | return (dest < e.dest); | |
133 | } | |
134 | }; | |
135 | ||
136 | /* | |
137 | * pipe_rules: deal with a set of pipes that have common endpoints_pair | |
138 | */ | |
139 | class pipe_rules { | |
140 | std::list<rgw_sync_bucket_pipe> pipes; | |
141 | ||
142 | public: | |
20effc67 | 143 | using prefix_map_t = std::multimap<std::string, rgw_sync_bucket_pipe *>; |
9f95a23c | 144 | |
20effc67 | 145 | std::map<std::string, rgw_sync_bucket_pipe *> tag_refs; |
9f95a23c TL |
146 | prefix_map_t prefix_refs; |
147 | ||
148 | void insert(const rgw_sync_bucket_pipe& pipe); | |
149 | ||
150 | bool find_basic_info_without_tags(const rgw_obj_key& key, | |
151 | std::optional<rgw_user> *user, | |
152 | std::optional<rgw_user> *acl_translation, | |
20effc67 | 153 | std::optional<std::string> *storage_class, |
9f95a23c TL |
154 | rgw_sync_pipe_params::Mode *mode, |
155 | bool *need_more_info) const; | |
156 | bool find_obj_params(const rgw_obj_key& key, | |
157 | const RGWObjTags::tag_map_t& tags, | |
158 | rgw_sync_pipe_params *params) const; | |
159 | ||
20effc67 | 160 | void scan_prefixes(std::vector<std::string> *prefixes) const; |
9f95a23c TL |
161 | |
162 | prefix_map_t::const_iterator prefix_begin() const { | |
163 | return prefix_refs.begin(); | |
164 | } | |
165 | prefix_map_t::const_iterator prefix_search(const std::string& s) const; | |
166 | prefix_map_t::const_iterator prefix_end() const { | |
167 | return prefix_refs.end(); | |
168 | } | |
169 | }; | |
170 | ||
171 | using pipe_rules_ref = std::shared_ptr<pipe_rules>; | |
172 | ||
173 | /* | |
174 | * pipe_handler: extends endpoints_rule to point at the corresponding rules handler | |
175 | */ | |
176 | struct pipe_handler : public endpoints_pair { | |
177 | pipe_rules_ref rules; | |
178 | ||
179 | pipe_handler() {} | |
180 | pipe_handler(pipe_rules_ref& _rules, | |
181 | const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe), | |
182 | rules(_rules) {} | |
183 | bool specific() const { | |
184 | return source.specific() && dest.specific(); | |
185 | } | |
186 | ||
187 | bool find_basic_info_without_tags(const rgw_obj_key& key, | |
188 | std::optional<rgw_user> *user, | |
189 | std::optional<rgw_user> *acl_translation, | |
20effc67 | 190 | std::optional<std::string> *storage_class, |
9f95a23c TL |
191 | rgw_sync_pipe_params::Mode *mode, |
192 | bool *need_more_info) const { | |
193 | if (!rules) { | |
194 | return false; | |
195 | } | |
196 | return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info); | |
197 | } | |
198 | ||
199 | bool find_obj_params(const rgw_obj_key& key, | |
200 | const RGWObjTags::tag_map_t& tags, | |
201 | rgw_sync_pipe_params *params) const { | |
202 | if (!rules) { | |
203 | return false; | |
204 | } | |
205 | return rules->find_obj_params(key, tags, params); | |
206 | } | |
207 | }; | |
208 | ||
209 | struct pipe_set { | |
210 | std::map<endpoints_pair, pipe_rules_ref> rules; | |
20effc67 | 211 | std::multimap<std::string, rgw_sync_bucket_pipe> pipe_map; |
9f95a23c TL |
212 | |
213 | std::set<pipe_handler> handlers; | |
214 | ||
215 | using iterator = std::set<pipe_handler>::iterator; | |
216 | ||
217 | void clear() { | |
218 | rules.clear(); | |
219 | pipe_map.clear(); | |
220 | handlers.clear(); | |
221 | } | |
222 | ||
223 | void insert(const rgw_sync_bucket_pipe& pipe); | |
224 | ||
225 | iterator begin() const { | |
226 | return handlers.begin(); | |
227 | } | |
228 | ||
229 | iterator end() const { | |
230 | return handlers.end(); | |
231 | } | |
232 | ||
233 | void dump(ceph::Formatter *f) const; | |
234 | }; | |
235 | ||
236 | private: | |
237 | ||
238 | CephContext *cct; | |
239 | ||
240 | rgw_zone_id zone_id; | |
241 | std::optional<rgw_bucket> bucket; | |
242 | ||
243 | const RGWBucketSyncFlowManager *parent{nullptr}; | |
244 | ||
20effc67 | 245 | std::map<std::string, rgw_sync_group_pipe_map> flow_groups; |
9f95a23c TL |
246 | |
247 | std::set<rgw_zone_id> all_zones; | |
248 | ||
249 | bool allowed_data_flow(const rgw_zone_id& source_zone, | |
250 | std::optional<rgw_bucket> source_bucket, | |
251 | const rgw_zone_id& dest_zone, | |
252 | std::optional<rgw_bucket> dest_bucket, | |
253 | bool check_activated) const; | |
254 | ||
255 | /* | |
256 | * find all the matching flows om a flow map for a specific bucket | |
257 | */ | |
258 | void update_flow_maps(const rgw_sync_bucket_pipes& pipe); | |
259 | ||
20effc67 | 260 | void init(const DoutPrefixProvider *dpp, const rgw_sync_policy_info& sync_policy); |
9f95a23c TL |
261 | |
262 | public: | |
263 | ||
264 | RGWBucketSyncFlowManager(CephContext *_cct, | |
265 | const rgw_zone_id& _zone_id, | |
266 | std::optional<rgw_bucket> _bucket, | |
267 | const RGWBucketSyncFlowManager *_parent); | |
268 | ||
20effc67 | 269 | void reflect(const DoutPrefixProvider *dpp, std::optional<rgw_bucket> effective_bucket, |
9f95a23c TL |
270 | pipe_set *flow_by_source, |
271 | pipe_set *flow_by_dest, | |
272 | bool only_enabled) const; | |
273 | ||
274 | }; | |
275 | ||
20effc67 | 276 | static inline std::ostream& operator<<(std::ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) { |
9f95a23c TL |
277 | os << e.dest << " -> " << e.source; |
278 | return os; | |
279 | } | |
280 | ||
281 | class RGWBucketSyncPolicyHandler { | |
282 | bool legacy_config{false}; | |
283 | const RGWBucketSyncPolicyHandler *parent{nullptr}; | |
284 | RGWSI_Zone *zone_svc; | |
285 | RGWSI_Bucket_Sync *bucket_sync_svc; | |
286 | rgw_zone_id zone_id; | |
287 | std::optional<RGWBucketInfo> bucket_info; | |
20effc67 | 288 | std::optional<std::map<std::string, bufferlist> > bucket_attrs; |
9f95a23c TL |
289 | std::optional<rgw_bucket> bucket; |
290 | std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr; | |
291 | rgw_sync_policy_info sync_policy; | |
292 | ||
293 | RGWBucketSyncFlowManager::pipe_set source_pipes; | |
294 | RGWBucketSyncFlowManager::pipe_set target_pipes; | |
295 | ||
20effc67 TL |
296 | std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */ |
297 | std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */ | |
9f95a23c TL |
298 | |
299 | std::set<rgw_zone_id> source_zones; | |
300 | std::set<rgw_zone_id> target_zones; | |
301 | ||
302 | std::set<rgw_bucket> source_hints; | |
303 | std::set<rgw_bucket> target_hints; | |
304 | std::set<rgw_sync_bucket_pipe> resolved_sources; | |
305 | std::set<rgw_sync_bucket_pipe> resolved_dests; | |
306 | ||
307 | ||
308 | bool bucket_is_sync_source() const { | |
309 | return !targets.empty() || !resolved_dests.empty(); | |
310 | } | |
311 | ||
312 | bool bucket_is_sync_target() const { | |
313 | return !sources.empty() || !resolved_sources.empty(); | |
314 | } | |
315 | ||
316 | RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, | |
317 | const RGWBucketInfo& _bucket_info, | |
20effc67 | 318 | std::map<std::string, bufferlist>&& _bucket_attrs); |
9f95a23c TL |
319 | |
320 | RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, | |
321 | const rgw_bucket& _bucket, | |
322 | std::optional<rgw_sync_policy_info> _sync_policy); | |
323 | public: | |
324 | RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, | |
325 | RGWSI_SyncModules *sync_modules_svc, | |
326 | RGWSI_Bucket_Sync *bucket_sync_svc, | |
327 | std::optional<rgw_zone_id> effective_zone = std::nullopt); | |
328 | ||
329 | RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info, | |
20effc67 | 330 | std::map<std::string, bufferlist>&& bucket_attrs) const; |
9f95a23c TL |
331 | RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket, |
332 | std::optional<rgw_sync_policy_info> sync_policy) const; | |
333 | ||
b3b6e05e | 334 | int init(const DoutPrefixProvider *dpp, optional_yield y); |
9f95a23c | 335 | |
20effc67 | 336 | void reflect(const DoutPrefixProvider *dpp, RGWBucketSyncFlowManager::pipe_set *psource_pipes, |
9f95a23c | 337 | RGWBucketSyncFlowManager::pipe_set *ptarget_pipes, |
20effc67 TL |
338 | std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources, |
339 | std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets, | |
9f95a23c TL |
340 | std::set<rgw_zone_id> *psource_zones, |
341 | std::set<rgw_zone_id> *ptarget_zones, | |
342 | bool only_enabled) const; | |
343 | ||
344 | void set_resolved_hints(std::set<rgw_sync_bucket_pipe>&& _resolved_sources, | |
345 | std::set<rgw_sync_bucket_pipe>&& _resolved_dests) { | |
346 | resolved_sources = std::move(_resolved_sources); | |
347 | resolved_dests = std::move(_resolved_dests); | |
348 | } | |
349 | ||
350 | const std::set<rgw_sync_bucket_pipe>& get_resolved_source_hints() { | |
351 | return resolved_sources; | |
352 | } | |
353 | ||
354 | const std::set<rgw_sync_bucket_pipe>& get_resolved_dest_hints() { | |
355 | return resolved_dests; | |
356 | } | |
357 | ||
358 | const std::set<rgw_zone_id>& get_source_zones() const { | |
359 | return source_zones; | |
360 | } | |
361 | ||
362 | const std::set<rgw_zone_id>& get_target_zones() const { | |
363 | return target_zones; | |
364 | } | |
365 | ||
20effc67 | 366 | const std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_sources() { |
9f95a23c TL |
367 | return sources; |
368 | } | |
369 | ||
20effc67 TL |
370 | std::multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources() const; |
371 | std::multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests() const; | |
372 | std::multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests_in_zone(const rgw_zone_id& zone_id) const; | |
9f95a23c | 373 | |
20effc67 | 374 | const std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() { |
9f95a23c TL |
375 | return targets; |
376 | } | |
377 | ||
378 | const std::optional<RGWBucketInfo>& get_bucket_info() const { | |
379 | return bucket_info; | |
380 | } | |
381 | ||
20effc67 | 382 | const std::optional<std::map<std::string, bufferlist> >& get_bucket_attrs() const { |
9f95a23c TL |
383 | return bucket_attrs; |
384 | } | |
385 | ||
386 | void get_pipes(RGWBucketSyncFlowManager::pipe_set **_sources, RGWBucketSyncFlowManager::pipe_set **_targets) { /* return raw pipes (with zone name) */ | |
387 | *_sources = &source_pipes; | |
388 | *_targets = &target_pipes; | |
389 | } | |
390 | void get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets, | |
391 | std::optional<rgw_sync_bucket_entity> filter_peer); | |
392 | ||
393 | const std::set<rgw_bucket>& get_source_hints() const { | |
394 | return source_hints; | |
395 | } | |
396 | ||
397 | const std::set<rgw_bucket>& get_target_hints() const { | |
398 | return target_hints; | |
399 | } | |
400 | ||
401 | bool bucket_exports_data() const; | |
402 | bool bucket_imports_data() const; | |
403 | ||
404 | const rgw_sync_policy_info& get_sync_policy() const { | |
405 | return sync_policy; | |
406 | } | |
407 | ||
408 | bool is_legacy_config() const { | |
409 | return legacy_config; | |
410 | } | |
411 | }; | |
412 |