]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_bucket_sync.h
488060b7a60acb929da0d67bddeddc18d7889928
[ceph.git] / ceph / src / rgw / rgw_bucket_sync.h
1
2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
4
5 /*
6 * Ceph - scalable distributed file system
7 *
8 * Copyright (C) 2018 Red Hat, Inc.
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #pragma once
18
19 #include "rgw_common.h"
20 #include "rgw_sync_policy.h"
21
22 class RGWSI_Zone;
23 class RGWSI_SyncModules;
24 class RGWSI_Bucket_Sync;
25
26 struct rgw_sync_group_pipe_map;
27 struct rgw_sync_bucket_pipes;
28 struct rgw_sync_policy_info;
29
30 struct rgw_sync_group_pipe_map {
31 rgw_zone_id zone;
32 std::optional<rgw_bucket> bucket;
33
34 rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
35
36 using zb_pipe_map_t = std::multimap<rgw_sync_bucket_entity, rgw_sync_bucket_pipe>;
37
38 zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
39 zb_pipe_map_t dests; /* all the pipes that pull from zone */
40
41 std::set<rgw_zone_id> *pall_zones{nullptr};
42 rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it,
43 used in the case of bucket sync policy, not at the
44 zonegroup level */
45
46 void dump(ceph::Formatter *f) const;
47
48 template <typename CB1, typename CB2>
49 void try_add_to_pipe_map(const rgw_zone_id& source_zone,
50 const rgw_zone_id& dest_zone,
51 const std::vector<rgw_sync_bucket_pipes>& pipes,
52 zb_pipe_map_t *pipe_map,
53 CB1 filter_cb,
54 CB2 call_filter_cb);
55
56 template <typename CB>
57 void try_add_source(const rgw_zone_id& source_zone,
58 const rgw_zone_id& dest_zone,
59 const std::vector<rgw_sync_bucket_pipes>& pipes,
60 CB filter_cb);
61
62 template <typename CB>
63 void try_add_dest(const rgw_zone_id& source_zone,
64 const rgw_zone_id& dest_zone,
65 const std::vector<rgw_sync_bucket_pipes>& pipes,
66 CB filter_cb);
67
68 pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
69 const rgw_zone_id& zone,
70 std::optional<rgw_bucket> b) const;
71
72 template <typename CB>
73 void init(CephContext *cct,
74 const rgw_zone_id& _zone,
75 std::optional<rgw_bucket> _bucket,
76 const rgw_sync_policy_group& group,
77 rgw_sync_data_flow_group *_default_flow,
78 std::set<rgw_zone_id> *_pall_zones,
79 CB filter_cb);
80
81 /*
82 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
83 */
84 vector<rgw_sync_bucket_pipe> find_source_pipes(const rgw_zone_id& source_zone,
85 std::optional<rgw_bucket> source_bucket,
86 std::optional<rgw_bucket> dest_bucket) const;
87
88 /*
89 * find all relevant pipes in other zones that pull from a specific
90 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
91 */
92 vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
93 const rgw_zone_id& dest_zone,
94 std::optional<rgw_bucket> dest_bucket) const;
95
96 /*
97 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
98 */
99 vector<rgw_sync_bucket_pipe> find_pipes(const rgw_zone_id& source_zone,
100 std::optional<rgw_bucket> source_bucket,
101 const rgw_zone_id& dest_zone,
102 std::optional<rgw_bucket> dest_bucket) const;
103 };
104
105 class RGWSyncPolicyCompat {
106 public:
107 static void convert_old_sync_config(RGWSI_Zone *zone_svc,
108 RGWSI_SyncModules *sync_modules_svc,
109 rgw_sync_policy_info *ppolicy);
110 };
111
112 class RGWBucketSyncFlowManager {
113 friend class RGWBucketSyncPolicyHandler;
114 public:
115 struct endpoints_pair {
116 rgw_sync_bucket_entity source;
117 rgw_sync_bucket_entity dest;
118
119 endpoints_pair() {}
120 endpoints_pair(const rgw_sync_bucket_pipe& pipe) {
121 source = pipe.source;
122 dest = pipe.dest;
123 }
124
125 bool operator<(const endpoints_pair& e) const {
126 if (source < e.source) {
127 return true;
128 }
129 if (e.source < source) {
130 return false;
131 }
132 return (dest < e.dest);
133 }
134 };
135
136 /*
137 * pipe_rules: deal with a set of pipes that have common endpoints_pair
138 */
139 class pipe_rules {
140 std::list<rgw_sync_bucket_pipe> pipes;
141
142 public:
143 using prefix_map_t = multimap<string, rgw_sync_bucket_pipe *>;
144
145 map<string, rgw_sync_bucket_pipe *> tag_refs;
146 prefix_map_t prefix_refs;
147
148 void insert(const rgw_sync_bucket_pipe& pipe);
149
150 bool find_basic_info_without_tags(const rgw_obj_key& key,
151 std::optional<rgw_user> *user,
152 std::optional<rgw_user> *acl_translation,
153 std::optional<string> *storage_class,
154 rgw_sync_pipe_params::Mode *mode,
155 bool *need_more_info) const;
156 bool find_obj_params(const rgw_obj_key& key,
157 const RGWObjTags::tag_map_t& tags,
158 rgw_sync_pipe_params *params) const;
159
160 void scan_prefixes(std::vector<string> *prefixes) const;
161
162 prefix_map_t::const_iterator prefix_begin() const {
163 return prefix_refs.begin();
164 }
165 prefix_map_t::const_iterator prefix_search(const std::string& s) const;
166 prefix_map_t::const_iterator prefix_end() const {
167 return prefix_refs.end();
168 }
169 };
170
171 using pipe_rules_ref = std::shared_ptr<pipe_rules>;
172
173 /*
174 * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
175 */
176 struct pipe_handler : public endpoints_pair {
177 pipe_rules_ref rules;
178
179 pipe_handler() {}
180 pipe_handler(pipe_rules_ref& _rules,
181 const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
182 rules(_rules) {}
183 bool specific() const {
184 return source.specific() && dest.specific();
185 }
186
187 bool find_basic_info_without_tags(const rgw_obj_key& key,
188 std::optional<rgw_user> *user,
189 std::optional<rgw_user> *acl_translation,
190 std::optional<string> *storage_class,
191 rgw_sync_pipe_params::Mode *mode,
192 bool *need_more_info) const {
193 if (!rules) {
194 return false;
195 }
196 return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info);
197 }
198
199 bool find_obj_params(const rgw_obj_key& key,
200 const RGWObjTags::tag_map_t& tags,
201 rgw_sync_pipe_params *params) const {
202 if (!rules) {
203 return false;
204 }
205 return rules->find_obj_params(key, tags, params);
206 }
207 };
208
209 struct pipe_set {
210 std::map<endpoints_pair, pipe_rules_ref> rules;
211 std::multimap<string, rgw_sync_bucket_pipe> pipe_map;
212
213 std::set<pipe_handler> handlers;
214
215 using iterator = std::set<pipe_handler>::iterator;
216
217 void clear() {
218 rules.clear();
219 pipe_map.clear();
220 handlers.clear();
221 }
222
223 void insert(const rgw_sync_bucket_pipe& pipe);
224
225 iterator begin() const {
226 return handlers.begin();
227 }
228
229 iterator end() const {
230 return handlers.end();
231 }
232
233 void dump(ceph::Formatter *f) const;
234 };
235
236 private:
237
238 CephContext *cct;
239
240 rgw_zone_id zone_id;
241 std::optional<rgw_bucket> bucket;
242
243 const RGWBucketSyncFlowManager *parent{nullptr};
244
245 map<string, rgw_sync_group_pipe_map> flow_groups;
246
247 std::set<rgw_zone_id> all_zones;
248
249 bool allowed_data_flow(const rgw_zone_id& source_zone,
250 std::optional<rgw_bucket> source_bucket,
251 const rgw_zone_id& dest_zone,
252 std::optional<rgw_bucket> dest_bucket,
253 bool check_activated) const;
254
255 /*
256 * find all the matching flows om a flow map for a specific bucket
257 */
258 void update_flow_maps(const rgw_sync_bucket_pipes& pipe);
259
260 void init(const rgw_sync_policy_info& sync_policy);
261
262 public:
263
264 RGWBucketSyncFlowManager(CephContext *_cct,
265 const rgw_zone_id& _zone_id,
266 std::optional<rgw_bucket> _bucket,
267 const RGWBucketSyncFlowManager *_parent);
268
269 void reflect(std::optional<rgw_bucket> effective_bucket,
270 pipe_set *flow_by_source,
271 pipe_set *flow_by_dest,
272 bool only_enabled) const;
273
274 };
275
276 static inline ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
277 os << e.dest << " -> " << e.source;
278 return os;
279 }
280
281 class RGWBucketSyncPolicyHandler {
282 bool legacy_config{false};
283 const RGWBucketSyncPolicyHandler *parent{nullptr};
284 RGWSI_Zone *zone_svc;
285 RGWSI_Bucket_Sync *bucket_sync_svc;
286 rgw_zone_id zone_id;
287 std::optional<RGWBucketInfo> bucket_info;
288 std::optional<map<string, bufferlist> > bucket_attrs;
289 std::optional<rgw_bucket> bucket;
290 std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
291 rgw_sync_policy_info sync_policy;
292
293 RGWBucketSyncFlowManager::pipe_set source_pipes;
294 RGWBucketSyncFlowManager::pipe_set target_pipes;
295
296 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */
297 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */
298
299 std::set<rgw_zone_id> source_zones;
300 std::set<rgw_zone_id> target_zones;
301
302 std::set<rgw_bucket> source_hints;
303 std::set<rgw_bucket> target_hints;
304 std::set<rgw_sync_bucket_pipe> resolved_sources;
305 std::set<rgw_sync_bucket_pipe> resolved_dests;
306
307
308 bool bucket_is_sync_source() const {
309 return !targets.empty() || !resolved_dests.empty();
310 }
311
312 bool bucket_is_sync_target() const {
313 return !sources.empty() || !resolved_sources.empty();
314 }
315
316 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
317 const RGWBucketInfo& _bucket_info,
318 map<string, bufferlist>&& _bucket_attrs);
319
320 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
321 const rgw_bucket& _bucket,
322 std::optional<rgw_sync_policy_info> _sync_policy);
323 public:
324 RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
325 RGWSI_SyncModules *sync_modules_svc,
326 RGWSI_Bucket_Sync *bucket_sync_svc,
327 std::optional<rgw_zone_id> effective_zone = std::nullopt);
328
329 RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info,
330 map<string, bufferlist>&& bucket_attrs) const;
331 RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
332 std::optional<rgw_sync_policy_info> sync_policy) const;
333
334 int init(optional_yield y);
335
336 void reflect(RGWBucketSyncFlowManager::pipe_set *psource_pipes,
337 RGWBucketSyncFlowManager::pipe_set *ptarget_pipes,
338 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources,
339 map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets,
340 std::set<rgw_zone_id> *psource_zones,
341 std::set<rgw_zone_id> *ptarget_zones,
342 bool only_enabled) const;
343
344 void set_resolved_hints(std::set<rgw_sync_bucket_pipe>&& _resolved_sources,
345 std::set<rgw_sync_bucket_pipe>&& _resolved_dests) {
346 resolved_sources = std::move(_resolved_sources);
347 resolved_dests = std::move(_resolved_dests);
348 }
349
350 const std::set<rgw_sync_bucket_pipe>& get_resolved_source_hints() {
351 return resolved_sources;
352 }
353
354 const std::set<rgw_sync_bucket_pipe>& get_resolved_dest_hints() {
355 return resolved_dests;
356 }
357
358 const std::set<rgw_zone_id>& get_source_zones() const {
359 return source_zones;
360 }
361
362 const std::set<rgw_zone_id>& get_target_zones() const {
363 return target_zones;
364 }
365
366 const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_sources() {
367 return sources;
368 }
369
370 multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources() const;
371 multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests() const;
372 multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests_in_zone(const rgw_zone_id& zone_id) const;
373
374 const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
375 return targets;
376 }
377
378 const std::optional<RGWBucketInfo>& get_bucket_info() const {
379 return bucket_info;
380 }
381
382 const std::optional<map<string, bufferlist> >& get_bucket_attrs() const {
383 return bucket_attrs;
384 }
385
386 void get_pipes(RGWBucketSyncFlowManager::pipe_set **_sources, RGWBucketSyncFlowManager::pipe_set **_targets) { /* return raw pipes (with zone name) */
387 *_sources = &source_pipes;
388 *_targets = &target_pipes;
389 }
390 void get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
391 std::optional<rgw_sync_bucket_entity> filter_peer);
392
393 const std::set<rgw_bucket>& get_source_hints() const {
394 return source_hints;
395 }
396
397 const std::set<rgw_bucket>& get_target_hints() const {
398 return target_hints;
399 }
400
401 bool bucket_exports_data() const;
402 bool bucket_imports_data() const;
403
404 const rgw_sync_policy_info& get_sync_policy() const {
405 return sync_policy;
406 }
407
408 bool is_legacy_config() const {
409 return legacy_config;
410 }
411 };
412