2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
6 * Ceph - scalable distributed file system
8 * Copyright (C) 2018 Red Hat, Inc.
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "rgw_common.h"
20 #include "rgw_sync_policy.h"
23 class RGWSI_SyncModules
;
24 class RGWSI_Bucket_Sync
;
26 struct rgw_sync_group_pipe_map
;
27 struct rgw_sync_bucket_pipes
;
28 struct rgw_sync_policy_info
;
30 struct rgw_sync_group_pipe_map
{
32 std::optional
<rgw_bucket
> bucket
;
34 rgw_sync_policy_group::Status status
{rgw_sync_policy_group::Status::UNKNOWN
};
36 using zb_pipe_map_t
= std::multimap
<rgw_sync_bucket_entity
, rgw_sync_bucket_pipe
>;
38 zb_pipe_map_t sources
; /* all the pipes where zone is pulling from */
39 zb_pipe_map_t dests
; /* all the pipes that pull from zone */
41 std::set
<rgw_zone_id
> *pall_zones
{nullptr};
42 rgw_sync_data_flow_group
*default_flow
{nullptr}; /* flow to use if policy doesn't define it,
43 used in the case of bucket sync policy, not at the
46 void dump(ceph::Formatter
*f
) const;
48 template <typename CB1
, typename CB2
>
49 void try_add_to_pipe_map(const rgw_zone_id
& source_zone
,
50 const rgw_zone_id
& dest_zone
,
51 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
52 zb_pipe_map_t
*pipe_map
,
56 template <typename CB
>
57 void try_add_source(const rgw_zone_id
& source_zone
,
58 const rgw_zone_id
& dest_zone
,
59 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
62 template <typename CB
>
63 void try_add_dest(const rgw_zone_id
& source_zone
,
64 const rgw_zone_id
& dest_zone
,
65 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
68 std::pair
<zb_pipe_map_t::const_iterator
, zb_pipe_map_t::const_iterator
> find_pipes(const zb_pipe_map_t
& m
,
69 const rgw_zone_id
& zone
,
70 std::optional
<rgw_bucket
> b
) const;
72 template <typename CB
>
73 void init(const DoutPrefixProvider
*dpp
, CephContext
*cct
,
74 const rgw_zone_id
& _zone
,
75 std::optional
<rgw_bucket
> _bucket
,
76 const rgw_sync_policy_group
& group
,
77 rgw_sync_data_flow_group
*_default_flow
,
78 std::set
<rgw_zone_id
> *_pall_zones
,
82 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
84 std::vector
<rgw_sync_bucket_pipe
> find_source_pipes(const rgw_zone_id
& source_zone
,
85 std::optional
<rgw_bucket
> source_bucket
,
86 std::optional
<rgw_bucket
> dest_bucket
) const;
89 * find all relevant pipes in other zones that pull from a specific
90 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
92 std::vector
<rgw_sync_bucket_pipe
> find_dest_pipes(std::optional
<rgw_bucket
> source_bucket
,
93 const rgw_zone_id
& dest_zone
,
94 std::optional
<rgw_bucket
> dest_bucket
) const;
97 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
99 std::vector
<rgw_sync_bucket_pipe
> find_pipes(const rgw_zone_id
& source_zone
,
100 std::optional
<rgw_bucket
> source_bucket
,
101 const rgw_zone_id
& dest_zone
,
102 std::optional
<rgw_bucket
> dest_bucket
) const;
105 class RGWSyncPolicyCompat
{
107 static void convert_old_sync_config(RGWSI_Zone
*zone_svc
,
108 RGWSI_SyncModules
*sync_modules_svc
,
109 rgw_sync_policy_info
*ppolicy
);
112 class RGWBucketSyncFlowManager
{
113 friend class RGWBucketSyncPolicyHandler
;
115 struct endpoints_pair
{
116 rgw_sync_bucket_entity source
;
117 rgw_sync_bucket_entity dest
;
120 endpoints_pair(const rgw_sync_bucket_pipe
& pipe
) {
121 source
= pipe
.source
;
125 bool operator<(const endpoints_pair
& e
) const {
126 if (source
< e
.source
) {
129 if (e
.source
< source
) {
132 return (dest
< e
.dest
);
137 * pipe_rules: deal with a set of pipes that have common endpoints_pair
140 std::list
<rgw_sync_bucket_pipe
> pipes
;
143 using prefix_map_t
= std::multimap
<std::string
, rgw_sync_bucket_pipe
*>;
145 std::map
<std::string
, rgw_sync_bucket_pipe
*> tag_refs
;
146 prefix_map_t prefix_refs
;
148 void insert(const rgw_sync_bucket_pipe
& pipe
);
150 bool find_basic_info_without_tags(const rgw_obj_key
& key
,
151 std::optional
<rgw_user
> *user
,
152 std::optional
<rgw_user
> *acl_translation
,
153 std::optional
<std::string
> *storage_class
,
154 rgw_sync_pipe_params::Mode
*mode
,
155 bool *need_more_info
) const;
156 bool find_obj_params(const rgw_obj_key
& key
,
157 const RGWObjTags::tag_map_t
& tags
,
158 rgw_sync_pipe_params
*params
) const;
160 void scan_prefixes(std::vector
<std::string
> *prefixes
) const;
162 prefix_map_t::const_iterator
prefix_begin() const {
163 return prefix_refs
.begin();
165 prefix_map_t::const_iterator
prefix_search(const std::string
& s
) const;
166 prefix_map_t::const_iterator
prefix_end() const {
167 return prefix_refs
.end();
171 using pipe_rules_ref
= std::shared_ptr
<pipe_rules
>;
174 * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
176 struct pipe_handler
: public endpoints_pair
{
177 pipe_rules_ref rules
;
180 pipe_handler(pipe_rules_ref
& _rules
,
181 const rgw_sync_bucket_pipe
& _pipe
) : endpoints_pair(_pipe
),
183 bool specific() const {
184 return source
.specific() && dest
.specific();
187 bool find_basic_info_without_tags(const rgw_obj_key
& key
,
188 std::optional
<rgw_user
> *user
,
189 std::optional
<rgw_user
> *acl_translation
,
190 std::optional
<std::string
> *storage_class
,
191 rgw_sync_pipe_params::Mode
*mode
,
192 bool *need_more_info
) const {
196 return rules
->find_basic_info_without_tags(key
, user
, acl_translation
, storage_class
, mode
, need_more_info
);
199 bool find_obj_params(const rgw_obj_key
& key
,
200 const RGWObjTags::tag_map_t
& tags
,
201 rgw_sync_pipe_params
*params
) const {
205 return rules
->find_obj_params(key
, tags
, params
);
210 std::map
<endpoints_pair
, pipe_rules_ref
> rules
;
211 std::multimap
<std::string
, rgw_sync_bucket_pipe
> pipe_map
;
212 std::multimap
<std::string
, rgw_sync_bucket_pipe
> disabled_pipe_map
;
214 std::set
<pipe_handler
> handlers
;
216 using iterator
= std::set
<pipe_handler
>::iterator
;
221 disabled_pipe_map
.clear();
225 void insert(const rgw_sync_bucket_pipe
& pipe
);
227 void disable(const rgw_sync_bucket_pipe
& pipe
);
229 iterator
begin() const {
230 return handlers
.begin();
233 iterator
end() const {
234 return handlers
.end();
237 void dump(ceph::Formatter
*f
) const;
245 std::optional
<rgw_bucket
> bucket
;
247 const RGWBucketSyncFlowManager
*parent
{nullptr};
249 std::map
<std::string
, rgw_sync_group_pipe_map
> flow_groups
;
251 std::set
<rgw_zone_id
> all_zones
;
253 bool allowed_data_flow(const rgw_zone_id
& source_zone
,
254 std::optional
<rgw_bucket
> source_bucket
,
255 const rgw_zone_id
& dest_zone
,
256 std::optional
<rgw_bucket
> dest_bucket
,
257 bool check_activated
) const;
260 * find all the matching flows om a flow map for a specific bucket
262 void update_flow_maps(const rgw_sync_bucket_pipes
& pipe
);
264 void init(const DoutPrefixProvider
*dpp
, const rgw_sync_policy_info
& sync_policy
);
268 RGWBucketSyncFlowManager(CephContext
*_cct
,
269 const rgw_zone_id
& _zone_id
,
270 std::optional
<rgw_bucket
> _bucket
,
271 const RGWBucketSyncFlowManager
*_parent
);
273 void reflect(const DoutPrefixProvider
*dpp
, std::optional
<rgw_bucket
> effective_bucket
,
274 pipe_set
*flow_by_source
,
275 pipe_set
*flow_by_dest
,
276 bool only_enabled
) const;
280 static inline std::ostream
& operator<<(std::ostream
& os
, const RGWBucketSyncFlowManager::endpoints_pair
& e
) {
281 os
<< e
.dest
<< " -> " << e
.source
;
285 class RGWBucketSyncPolicyHandler
{
286 bool legacy_config
{false};
287 const RGWBucketSyncPolicyHandler
*parent
{nullptr};
288 RGWSI_Zone
*zone_svc
;
289 RGWSI_Bucket_Sync
*bucket_sync_svc
;
291 std::optional
<RGWBucketInfo
> bucket_info
;
292 std::optional
<std::map
<std::string
, bufferlist
> > bucket_attrs
;
293 std::optional
<rgw_bucket
> bucket
;
294 std::unique_ptr
<RGWBucketSyncFlowManager
> flow_mgr
;
295 rgw_sync_policy_info sync_policy
;
297 RGWBucketSyncFlowManager::pipe_set source_pipes
;
298 RGWBucketSyncFlowManager::pipe_set target_pipes
;
300 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> sources
; /* source pipes by source zone id */
301 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> targets
; /* target pipes by target zone id */
303 std::set
<rgw_zone_id
> source_zones
;
304 std::set
<rgw_zone_id
> target_zones
;
306 std::set
<rgw_bucket
> source_hints
;
307 std::set
<rgw_bucket
> target_hints
;
308 std::set
<rgw_sync_bucket_pipe
> resolved_sources
;
309 std::set
<rgw_sync_bucket_pipe
> resolved_dests
;
312 bool bucket_is_sync_source() const {
313 return !targets
.empty() || !resolved_dests
.empty();
316 bool bucket_is_sync_target() const {
317 return !sources
.empty() || !resolved_sources
.empty();
320 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler
*_parent
,
321 const RGWBucketInfo
& _bucket_info
,
322 std::map
<std::string
, bufferlist
>&& _bucket_attrs
);
324 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler
*_parent
,
325 const rgw_bucket
& _bucket
,
326 std::optional
<rgw_sync_policy_info
> _sync_policy
);
328 RGWBucketSyncPolicyHandler(RGWSI_Zone
*_zone_svc
,
329 RGWSI_SyncModules
*sync_modules_svc
,
330 RGWSI_Bucket_Sync
*bucket_sync_svc
,
331 std::optional
<rgw_zone_id
> effective_zone
= std::nullopt
);
333 RGWBucketSyncPolicyHandler
*alloc_child(const RGWBucketInfo
& bucket_info
,
334 std::map
<std::string
, bufferlist
>&& bucket_attrs
) const;
335 RGWBucketSyncPolicyHandler
*alloc_child(const rgw_bucket
& bucket
,
336 std::optional
<rgw_sync_policy_info
> sync_policy
) const;
338 int init(const DoutPrefixProvider
*dpp
, optional_yield y
);
340 void reflect(const DoutPrefixProvider
*dpp
, RGWBucketSyncFlowManager::pipe_set
*psource_pipes
,
341 RGWBucketSyncFlowManager::pipe_set
*ptarget_pipes
,
342 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> *psources
,
343 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> *ptargets
,
344 std::set
<rgw_zone_id
> *psource_zones
,
345 std::set
<rgw_zone_id
> *ptarget_zones
,
346 bool only_enabled
) const;
348 void set_resolved_hints(std::set
<rgw_sync_bucket_pipe
>&& _resolved_sources
,
349 std::set
<rgw_sync_bucket_pipe
>&& _resolved_dests
) {
350 resolved_sources
= std::move(_resolved_sources
);
351 resolved_dests
= std::move(_resolved_dests
);
354 const std::set
<rgw_sync_bucket_pipe
>& get_resolved_source_hints() {
355 return resolved_sources
;
358 const std::set
<rgw_sync_bucket_pipe
>& get_resolved_dest_hints() {
359 return resolved_dests
;
362 const std::set
<rgw_zone_id
>& get_source_zones() const {
366 const std::set
<rgw_zone_id
>& get_target_zones() const {
370 const std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& get_sources() {
374 std::multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> get_all_sources() const;
375 std::multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> get_all_dests() const;
376 std::multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> get_all_dests_in_zone(const rgw_zone_id
& zone_id
) const;
378 const std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& get_targets() {
382 const std::optional
<RGWBucketInfo
>& get_bucket_info() const {
386 const std::optional
<std::map
<std::string
, bufferlist
> >& get_bucket_attrs() const {
390 void get_pipes(RGWBucketSyncFlowManager::pipe_set
**_sources
, RGWBucketSyncFlowManager::pipe_set
**_targets
) { /* return raw pipes (with zone name) */
391 *_sources
= &source_pipes
;
392 *_targets
= &target_pipes
;
394 void get_pipes(std::set
<rgw_sync_bucket_pipe
> *sources
, std::set
<rgw_sync_bucket_pipe
> *targets
,
395 std::optional
<rgw_sync_bucket_entity
> filter_peer
);
397 const std::set
<rgw_bucket
>& get_source_hints() const {
401 const std::set
<rgw_bucket
>& get_target_hints() const {
405 bool bucket_exports_data() const;
406 bool bucket_imports_data() const;
408 const rgw_sync_policy_info
& get_sync_policy() const {
412 bool is_legacy_config() const {
413 return legacy_config
;