]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_bucket_sync.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_bucket_sync.h
1
2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
4
5 /*
6 * Ceph - scalable distributed file system
7 *
8 * Copyright (C) 2018 Red Hat, Inc.
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #pragma once
18
19 #include "rgw_common.h"
20 #include "rgw_sync_policy.h"
21
22 class RGWSI_Zone;
23 class RGWSI_SyncModules;
24 class RGWSI_Bucket_Sync;
25
26 struct rgw_sync_group_pipe_map;
27 struct rgw_sync_bucket_pipes;
28 struct rgw_sync_policy_info;
29
30 struct rgw_sync_group_pipe_map {
31 rgw_zone_id zone;
32 std::optional<rgw_bucket> bucket;
33
34 rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::UNKNOWN};
35
36 using zb_pipe_map_t = std::multimap<rgw_sync_bucket_entity, rgw_sync_bucket_pipe>;
37
38 zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
39 zb_pipe_map_t dests; /* all the pipes that pull from zone */
40
41 std::set<rgw_zone_id> *pall_zones{nullptr};
42 rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it,
43 used in the case of bucket sync policy, not at the
44 zonegroup level */
45
46 void dump(ceph::Formatter *f) const;
47
48 template <typename CB1, typename CB2>
49 void try_add_to_pipe_map(const rgw_zone_id& source_zone,
50 const rgw_zone_id& dest_zone,
51 const std::vector<rgw_sync_bucket_pipes>& pipes,
52 zb_pipe_map_t *pipe_map,
53 CB1 filter_cb,
54 CB2 call_filter_cb);
55
56 template <typename CB>
57 void try_add_source(const rgw_zone_id& source_zone,
58 const rgw_zone_id& dest_zone,
59 const std::vector<rgw_sync_bucket_pipes>& pipes,
60 CB filter_cb);
61
62 template <typename CB>
63 void try_add_dest(const rgw_zone_id& source_zone,
64 const rgw_zone_id& dest_zone,
65 const std::vector<rgw_sync_bucket_pipes>& pipes,
66 CB filter_cb);
67
68 std::pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
69 const rgw_zone_id& zone,
70 std::optional<rgw_bucket> b) const;
71
72 template <typename CB>
73 void init(const DoutPrefixProvider *dpp, CephContext *cct,
74 const rgw_zone_id& _zone,
75 std::optional<rgw_bucket> _bucket,
76 const rgw_sync_policy_group& group,
77 rgw_sync_data_flow_group *_default_flow,
78 std::set<rgw_zone_id> *_pall_zones,
79 CB filter_cb);
80
81 /*
82 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
83 */
84 std::vector<rgw_sync_bucket_pipe> find_source_pipes(const rgw_zone_id& source_zone,
85 std::optional<rgw_bucket> source_bucket,
86 std::optional<rgw_bucket> dest_bucket) const;
87
88 /*
89 * find all relevant pipes in other zones that pull from a specific
90 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
91 */
92 std::vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
93 const rgw_zone_id& dest_zone,
94 std::optional<rgw_bucket> dest_bucket) const;
95
96 /*
97 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
98 */
99 std::vector<rgw_sync_bucket_pipe> find_pipes(const rgw_zone_id& source_zone,
100 std::optional<rgw_bucket> source_bucket,
101 const rgw_zone_id& dest_zone,
102 std::optional<rgw_bucket> dest_bucket) const;
103 };
104
105 class RGWSyncPolicyCompat {
106 public:
107 static void convert_old_sync_config(RGWSI_Zone *zone_svc,
108 RGWSI_SyncModules *sync_modules_svc,
109 rgw_sync_policy_info *ppolicy);
110 };
111
112 class RGWBucketSyncFlowManager {
113 friend class RGWBucketSyncPolicyHandler;
114 public:
115 struct endpoints_pair {
116 rgw_sync_bucket_entity source;
117 rgw_sync_bucket_entity dest;
118
119 endpoints_pair() {}
120 endpoints_pair(const rgw_sync_bucket_pipe& pipe) {
121 source = pipe.source;
122 dest = pipe.dest;
123 }
124
125 bool operator<(const endpoints_pair& e) const {
126 if (source < e.source) {
127 return true;
128 }
129 if (e.source < source) {
130 return false;
131 }
132 return (dest < e.dest);
133 }
134 };
135
136 /*
137 * pipe_rules: deal with a set of pipes that have common endpoints_pair
138 */
139 class pipe_rules {
140 std::list<rgw_sync_bucket_pipe> pipes;
141
142 public:
143 using prefix_map_t = std::multimap<std::string, rgw_sync_bucket_pipe *>;
144
145 std::map<std::string, rgw_sync_bucket_pipe *> tag_refs;
146 prefix_map_t prefix_refs;
147
148 void insert(const rgw_sync_bucket_pipe& pipe);
149
150 bool find_basic_info_without_tags(const rgw_obj_key& key,
151 std::optional<rgw_user> *user,
152 std::optional<rgw_user> *acl_translation,
153 std::optional<std::string> *storage_class,
154 rgw_sync_pipe_params::Mode *mode,
155 bool *need_more_info) const;
156 bool find_obj_params(const rgw_obj_key& key,
157 const RGWObjTags::tag_map_t& tags,
158 rgw_sync_pipe_params *params) const;
159
160 void scan_prefixes(std::vector<std::string> *prefixes) const;
161
162 prefix_map_t::const_iterator prefix_begin() const {
163 return prefix_refs.begin();
164 }
165 prefix_map_t::const_iterator prefix_search(const std::string& s) const;
166 prefix_map_t::const_iterator prefix_end() const {
167 return prefix_refs.end();
168 }
169 };
170
171 using pipe_rules_ref = std::shared_ptr<pipe_rules>;
172
173 /*
174 * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
175 */
176 struct pipe_handler : public endpoints_pair {
177 pipe_rules_ref rules;
178
179 pipe_handler() {}
180 pipe_handler(pipe_rules_ref& _rules,
181 const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
182 rules(_rules) {}
183 bool specific() const {
184 return source.specific() && dest.specific();
185 }
186
187 bool find_basic_info_without_tags(const rgw_obj_key& key,
188 std::optional<rgw_user> *user,
189 std::optional<rgw_user> *acl_translation,
190 std::optional<std::string> *storage_class,
191 rgw_sync_pipe_params::Mode *mode,
192 bool *need_more_info) const {
193 if (!rules) {
194 return false;
195 }
196 return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info);
197 }
198
199 bool find_obj_params(const rgw_obj_key& key,
200 const RGWObjTags::tag_map_t& tags,
201 rgw_sync_pipe_params *params) const {
202 if (!rules) {
203 return false;
204 }
205 return rules->find_obj_params(key, tags, params);
206 }
207 };
208
209 struct pipe_set {
210 std::map<endpoints_pair, pipe_rules_ref> rules;
211 std::multimap<std::string, rgw_sync_bucket_pipe> pipe_map;
212 std::multimap<std::string, rgw_sync_bucket_pipe> disabled_pipe_map;
213
214 std::set<pipe_handler> handlers;
215
216 using iterator = std::set<pipe_handler>::iterator;
217
218 void clear() {
219 rules.clear();
220 pipe_map.clear();
221 disabled_pipe_map.clear();
222 handlers.clear();
223 }
224
225 void insert(const rgw_sync_bucket_pipe& pipe);
226 void remove_all();
227 void disable(const rgw_sync_bucket_pipe& pipe);
228
229 iterator begin() const {
230 return handlers.begin();
231 }
232
233 iterator end() const {
234 return handlers.end();
235 }
236
237 void dump(ceph::Formatter *f) const;
238 };
239
240 private:
241
242 CephContext *cct;
243
244 rgw_zone_id zone_id;
245 std::optional<rgw_bucket> bucket;
246
247 const RGWBucketSyncFlowManager *parent{nullptr};
248
249 std::map<std::string, rgw_sync_group_pipe_map> flow_groups;
250
251 std::set<rgw_zone_id> all_zones;
252
253 bool allowed_data_flow(const rgw_zone_id& source_zone,
254 std::optional<rgw_bucket> source_bucket,
255 const rgw_zone_id& dest_zone,
256 std::optional<rgw_bucket> dest_bucket,
257 bool check_activated) const;
258
259 /*
260 * find all the matching flows om a flow map for a specific bucket
261 */
262 void update_flow_maps(const rgw_sync_bucket_pipes& pipe);
263
264 void init(const DoutPrefixProvider *dpp, const rgw_sync_policy_info& sync_policy);
265
266 public:
267
268 RGWBucketSyncFlowManager(CephContext *_cct,
269 const rgw_zone_id& _zone_id,
270 std::optional<rgw_bucket> _bucket,
271 const RGWBucketSyncFlowManager *_parent);
272
273 void reflect(const DoutPrefixProvider *dpp, std::optional<rgw_bucket> effective_bucket,
274 pipe_set *flow_by_source,
275 pipe_set *flow_by_dest,
276 bool only_enabled) const;
277
278 };
279
280 static inline std::ostream& operator<<(std::ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
281 os << e.dest << " -> " << e.source;
282 return os;
283 }
284
285 class RGWBucketSyncPolicyHandler {
286 bool legacy_config{false};
287 const RGWBucketSyncPolicyHandler *parent{nullptr};
288 RGWSI_Zone *zone_svc;
289 RGWSI_Bucket_Sync *bucket_sync_svc;
290 rgw_zone_id zone_id;
291 std::optional<RGWBucketInfo> bucket_info;
292 std::optional<std::map<std::string, bufferlist> > bucket_attrs;
293 std::optional<rgw_bucket> bucket;
294 std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
295 rgw_sync_policy_info sync_policy;
296
297 RGWBucketSyncFlowManager::pipe_set source_pipes;
298 RGWBucketSyncFlowManager::pipe_set target_pipes;
299
300 std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */
301 std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */
302
303 std::set<rgw_zone_id> source_zones;
304 std::set<rgw_zone_id> target_zones;
305
306 std::set<rgw_bucket> source_hints;
307 std::set<rgw_bucket> target_hints;
308 std::set<rgw_sync_bucket_pipe> resolved_sources;
309 std::set<rgw_sync_bucket_pipe> resolved_dests;
310
311
312 bool bucket_is_sync_source() const {
313 return !targets.empty() || !resolved_dests.empty();
314 }
315
316 bool bucket_is_sync_target() const {
317 return !sources.empty() || !resolved_sources.empty();
318 }
319
320 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
321 const RGWBucketInfo& _bucket_info,
322 std::map<std::string, bufferlist>&& _bucket_attrs);
323
324 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
325 const rgw_bucket& _bucket,
326 std::optional<rgw_sync_policy_info> _sync_policy);
327 public:
328 RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
329 RGWSI_SyncModules *sync_modules_svc,
330 RGWSI_Bucket_Sync *bucket_sync_svc,
331 std::optional<rgw_zone_id> effective_zone = std::nullopt);
332
333 RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info,
334 std::map<std::string, bufferlist>&& bucket_attrs) const;
335 RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
336 std::optional<rgw_sync_policy_info> sync_policy) const;
337
338 int init(const DoutPrefixProvider *dpp, optional_yield y);
339
340 void reflect(const DoutPrefixProvider *dpp, RGWBucketSyncFlowManager::pipe_set *psource_pipes,
341 RGWBucketSyncFlowManager::pipe_set *ptarget_pipes,
342 std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources,
343 std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets,
344 std::set<rgw_zone_id> *psource_zones,
345 std::set<rgw_zone_id> *ptarget_zones,
346 bool only_enabled) const;
347
348 void set_resolved_hints(std::set<rgw_sync_bucket_pipe>&& _resolved_sources,
349 std::set<rgw_sync_bucket_pipe>&& _resolved_dests) {
350 resolved_sources = std::move(_resolved_sources);
351 resolved_dests = std::move(_resolved_dests);
352 }
353
354 const std::set<rgw_sync_bucket_pipe>& get_resolved_source_hints() {
355 return resolved_sources;
356 }
357
358 const std::set<rgw_sync_bucket_pipe>& get_resolved_dest_hints() {
359 return resolved_dests;
360 }
361
362 const std::set<rgw_zone_id>& get_source_zones() const {
363 return source_zones;
364 }
365
366 const std::set<rgw_zone_id>& get_target_zones() const {
367 return target_zones;
368 }
369
370 const std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_sources() {
371 return sources;
372 }
373
374 std::multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources() const;
375 std::multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests() const;
376 std::multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests_in_zone(const rgw_zone_id& zone_id) const;
377
378 const std::map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
379 return targets;
380 }
381
382 const std::optional<RGWBucketInfo>& get_bucket_info() const {
383 return bucket_info;
384 }
385
386 const std::optional<std::map<std::string, bufferlist> >& get_bucket_attrs() const {
387 return bucket_attrs;
388 }
389
390 void get_pipes(RGWBucketSyncFlowManager::pipe_set **_sources, RGWBucketSyncFlowManager::pipe_set **_targets) { /* return raw pipes (with zone name) */
391 *_sources = &source_pipes;
392 *_targets = &target_pipes;
393 }
394 void get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
395 std::optional<rgw_sync_bucket_entity> filter_peer);
396
397 const std::set<rgw_bucket>& get_source_hints() const {
398 return source_hints;
399 }
400
401 const std::set<rgw_bucket>& get_target_hints() const {
402 return target_hints;
403 }
404
405 bool bucket_exports_data() const;
406 bool bucket_imports_data() const;
407
408 const rgw_sync_policy_info& get_sync_policy() const {
409 return sync_policy;
410 }
411
412 bool is_legacy_config() const {
413 return legacy_config;
414 }
415 };
416