2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
6 * Ceph - scalable distributed file system
8 * Copyright (C) 2018 Red Hat, Inc.
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "rgw_common.h"
20 #include "rgw_sync_policy.h"
23 class RGWSI_SyncModules
;
24 class RGWSI_Bucket_Sync
;
26 struct rgw_sync_group_pipe_map
;
27 struct rgw_sync_bucket_pipes
;
28 struct rgw_sync_policy_info
;
30 struct rgw_sync_group_pipe_map
{
32 std::optional
<rgw_bucket
> bucket
;
34 rgw_sync_policy_group::Status status
{rgw_sync_policy_group::Status::FORBIDDEN
};
36 using zb_pipe_map_t
= std::multimap
<rgw_sync_bucket_entity
, rgw_sync_bucket_pipe
>;
38 zb_pipe_map_t sources
; /* all the pipes where zone is pulling from */
39 zb_pipe_map_t dests
; /* all the pipes that pull from zone */
41 std::set
<rgw_zone_id
> *pall_zones
{nullptr};
42 rgw_sync_data_flow_group
*default_flow
{nullptr}; /* flow to use if policy doesn't define it,
43 used in the case of bucket sync policy, not at the
46 void dump(ceph::Formatter
*f
) const;
48 template <typename CB1
, typename CB2
>
49 void try_add_to_pipe_map(const rgw_zone_id
& source_zone
,
50 const rgw_zone_id
& dest_zone
,
51 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
52 zb_pipe_map_t
*pipe_map
,
56 template <typename CB
>
57 void try_add_source(const rgw_zone_id
& source_zone
,
58 const rgw_zone_id
& dest_zone
,
59 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
62 template <typename CB
>
63 void try_add_dest(const rgw_zone_id
& source_zone
,
64 const rgw_zone_id
& dest_zone
,
65 const std::vector
<rgw_sync_bucket_pipes
>& pipes
,
68 std::pair
<zb_pipe_map_t::const_iterator
, zb_pipe_map_t::const_iterator
> find_pipes(const zb_pipe_map_t
& m
,
69 const rgw_zone_id
& zone
,
70 std::optional
<rgw_bucket
> b
) const;
72 template <typename CB
>
73 void init(const DoutPrefixProvider
*dpp
, CephContext
*cct
,
74 const rgw_zone_id
& _zone
,
75 std::optional
<rgw_bucket
> _bucket
,
76 const rgw_sync_policy_group
& group
,
77 rgw_sync_data_flow_group
*_default_flow
,
78 std::set
<rgw_zone_id
> *_pall_zones
,
82 * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
84 std::vector
<rgw_sync_bucket_pipe
> find_source_pipes(const rgw_zone_id
& source_zone
,
85 std::optional
<rgw_bucket
> source_bucket
,
86 std::optional
<rgw_bucket
> dest_bucket
) const;
89 * find all relevant pipes in other zones that pull from a specific
90 * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
92 std::vector
<rgw_sync_bucket_pipe
> find_dest_pipes(std::optional
<rgw_bucket
> source_bucket
,
93 const rgw_zone_id
& dest_zone
,
94 std::optional
<rgw_bucket
> dest_bucket
) const;
97 * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
99 std::vector
<rgw_sync_bucket_pipe
> find_pipes(const rgw_zone_id
& source_zone
,
100 std::optional
<rgw_bucket
> source_bucket
,
101 const rgw_zone_id
& dest_zone
,
102 std::optional
<rgw_bucket
> dest_bucket
) const;
105 class RGWSyncPolicyCompat
{
107 static void convert_old_sync_config(RGWSI_Zone
*zone_svc
,
108 RGWSI_SyncModules
*sync_modules_svc
,
109 rgw_sync_policy_info
*ppolicy
);
112 class RGWBucketSyncFlowManager
{
113 friend class RGWBucketSyncPolicyHandler
;
115 struct endpoints_pair
{
116 rgw_sync_bucket_entity source
;
117 rgw_sync_bucket_entity dest
;
120 endpoints_pair(const rgw_sync_bucket_pipe
& pipe
) {
121 source
= pipe
.source
;
125 bool operator<(const endpoints_pair
& e
) const {
126 if (source
< e
.source
) {
129 if (e
.source
< source
) {
132 return (dest
< e
.dest
);
137 * pipe_rules: deal with a set of pipes that have common endpoints_pair
140 std::list
<rgw_sync_bucket_pipe
> pipes
;
143 using prefix_map_t
= std::multimap
<std::string
, rgw_sync_bucket_pipe
*>;
145 std::map
<std::string
, rgw_sync_bucket_pipe
*> tag_refs
;
146 prefix_map_t prefix_refs
;
148 void insert(const rgw_sync_bucket_pipe
& pipe
);
150 bool find_basic_info_without_tags(const rgw_obj_key
& key
,
151 std::optional
<rgw_user
> *user
,
152 std::optional
<rgw_user
> *acl_translation
,
153 std::optional
<std::string
> *storage_class
,
154 rgw_sync_pipe_params::Mode
*mode
,
155 bool *need_more_info
) const;
156 bool find_obj_params(const rgw_obj_key
& key
,
157 const RGWObjTags::tag_map_t
& tags
,
158 rgw_sync_pipe_params
*params
) const;
160 void scan_prefixes(std::vector
<std::string
> *prefixes
) const;
162 prefix_map_t::const_iterator
prefix_begin() const {
163 return prefix_refs
.begin();
165 prefix_map_t::const_iterator
prefix_search(const std::string
& s
) const;
166 prefix_map_t::const_iterator
prefix_end() const {
167 return prefix_refs
.end();
171 using pipe_rules_ref
= std::shared_ptr
<pipe_rules
>;
174 * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
176 struct pipe_handler
: public endpoints_pair
{
177 pipe_rules_ref rules
;
180 pipe_handler(pipe_rules_ref
& _rules
,
181 const rgw_sync_bucket_pipe
& _pipe
) : endpoints_pair(_pipe
),
183 bool specific() const {
184 return source
.specific() && dest
.specific();
187 bool find_basic_info_without_tags(const rgw_obj_key
& key
,
188 std::optional
<rgw_user
> *user
,
189 std::optional
<rgw_user
> *acl_translation
,
190 std::optional
<std::string
> *storage_class
,
191 rgw_sync_pipe_params::Mode
*mode
,
192 bool *need_more_info
) const {
196 return rules
->find_basic_info_without_tags(key
, user
, acl_translation
, storage_class
, mode
, need_more_info
);
199 bool find_obj_params(const rgw_obj_key
& key
,
200 const RGWObjTags::tag_map_t
& tags
,
201 rgw_sync_pipe_params
*params
) const {
205 return rules
->find_obj_params(key
, tags
, params
);
210 std::map
<endpoints_pair
, pipe_rules_ref
> rules
;
211 std::multimap
<std::string
, rgw_sync_bucket_pipe
> pipe_map
;
213 std::set
<pipe_handler
> handlers
;
215 using iterator
= std::set
<pipe_handler
>::iterator
;
223 void insert(const rgw_sync_bucket_pipe
& pipe
);
225 iterator
begin() const {
226 return handlers
.begin();
229 iterator
end() const {
230 return handlers
.end();
233 void dump(ceph::Formatter
*f
) const;
241 std::optional
<rgw_bucket
> bucket
;
243 const RGWBucketSyncFlowManager
*parent
{nullptr};
245 std::map
<std::string
, rgw_sync_group_pipe_map
> flow_groups
;
247 std::set
<rgw_zone_id
> all_zones
;
249 bool allowed_data_flow(const rgw_zone_id
& source_zone
,
250 std::optional
<rgw_bucket
> source_bucket
,
251 const rgw_zone_id
& dest_zone
,
252 std::optional
<rgw_bucket
> dest_bucket
,
253 bool check_activated
) const;
256 * find all the matching flows om a flow map for a specific bucket
258 void update_flow_maps(const rgw_sync_bucket_pipes
& pipe
);
260 void init(const DoutPrefixProvider
*dpp
, const rgw_sync_policy_info
& sync_policy
);
264 RGWBucketSyncFlowManager(CephContext
*_cct
,
265 const rgw_zone_id
& _zone_id
,
266 std::optional
<rgw_bucket
> _bucket
,
267 const RGWBucketSyncFlowManager
*_parent
);
269 void reflect(const DoutPrefixProvider
*dpp
, std::optional
<rgw_bucket
> effective_bucket
,
270 pipe_set
*flow_by_source
,
271 pipe_set
*flow_by_dest
,
272 bool only_enabled
) const;
276 static inline std::ostream
& operator<<(std::ostream
& os
, const RGWBucketSyncFlowManager::endpoints_pair
& e
) {
277 os
<< e
.dest
<< " -> " << e
.source
;
281 class RGWBucketSyncPolicyHandler
{
282 bool legacy_config
{false};
283 const RGWBucketSyncPolicyHandler
*parent
{nullptr};
284 RGWSI_Zone
*zone_svc
;
285 RGWSI_Bucket_Sync
*bucket_sync_svc
;
287 std::optional
<RGWBucketInfo
> bucket_info
;
288 std::optional
<std::map
<std::string
, bufferlist
> > bucket_attrs
;
289 std::optional
<rgw_bucket
> bucket
;
290 std::unique_ptr
<RGWBucketSyncFlowManager
> flow_mgr
;
291 rgw_sync_policy_info sync_policy
;
293 RGWBucketSyncFlowManager::pipe_set source_pipes
;
294 RGWBucketSyncFlowManager::pipe_set target_pipes
;
296 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> sources
; /* source pipes by source zone id */
297 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> targets
; /* target pipes by target zone id */
299 std::set
<rgw_zone_id
> source_zones
;
300 std::set
<rgw_zone_id
> target_zones
;
302 std::set
<rgw_bucket
> source_hints
;
303 std::set
<rgw_bucket
> target_hints
;
304 std::set
<rgw_sync_bucket_pipe
> resolved_sources
;
305 std::set
<rgw_sync_bucket_pipe
> resolved_dests
;
308 bool bucket_is_sync_source() const {
309 return !targets
.empty() || !resolved_dests
.empty();
312 bool bucket_is_sync_target() const {
313 return !sources
.empty() || !resolved_sources
.empty();
316 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler
*_parent
,
317 const RGWBucketInfo
& _bucket_info
,
318 std::map
<std::string
, bufferlist
>&& _bucket_attrs
);
320 RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler
*_parent
,
321 const rgw_bucket
& _bucket
,
322 std::optional
<rgw_sync_policy_info
> _sync_policy
);
324 RGWBucketSyncPolicyHandler(RGWSI_Zone
*_zone_svc
,
325 RGWSI_SyncModules
*sync_modules_svc
,
326 RGWSI_Bucket_Sync
*bucket_sync_svc
,
327 std::optional
<rgw_zone_id
> effective_zone
= std::nullopt
);
329 RGWBucketSyncPolicyHandler
*alloc_child(const RGWBucketInfo
& bucket_info
,
330 std::map
<std::string
, bufferlist
>&& bucket_attrs
) const;
331 RGWBucketSyncPolicyHandler
*alloc_child(const rgw_bucket
& bucket
,
332 std::optional
<rgw_sync_policy_info
> sync_policy
) const;
334 int init(const DoutPrefixProvider
*dpp
, optional_yield y
);
336 void reflect(const DoutPrefixProvider
*dpp
, RGWBucketSyncFlowManager::pipe_set
*psource_pipes
,
337 RGWBucketSyncFlowManager::pipe_set
*ptarget_pipes
,
338 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> *psources
,
339 std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
> *ptargets
,
340 std::set
<rgw_zone_id
> *psource_zones
,
341 std::set
<rgw_zone_id
> *ptarget_zones
,
342 bool only_enabled
) const;
344 void set_resolved_hints(std::set
<rgw_sync_bucket_pipe
>&& _resolved_sources
,
345 std::set
<rgw_sync_bucket_pipe
>&& _resolved_dests
) {
346 resolved_sources
= std::move(_resolved_sources
);
347 resolved_dests
= std::move(_resolved_dests
);
350 const std::set
<rgw_sync_bucket_pipe
>& get_resolved_source_hints() {
351 return resolved_sources
;
354 const std::set
<rgw_sync_bucket_pipe
>& get_resolved_dest_hints() {
355 return resolved_dests
;
358 const std::set
<rgw_zone_id
>& get_source_zones() const {
362 const std::set
<rgw_zone_id
>& get_target_zones() const {
366 const std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& get_sources() {
370 std::multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> get_all_sources() const;
371 std::multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> get_all_dests() const;
372 std::multimap
<rgw_zone_id
, rgw_sync_bucket_pipe
> get_all_dests_in_zone(const rgw_zone_id
& zone_id
) const;
374 const std::map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& get_targets() {
378 const std::optional
<RGWBucketInfo
>& get_bucket_info() const {
382 const std::optional
<std::map
<std::string
, bufferlist
> >& get_bucket_attrs() const {
386 void get_pipes(RGWBucketSyncFlowManager::pipe_set
**_sources
, RGWBucketSyncFlowManager::pipe_set
**_targets
) { /* return raw pipes (with zone name) */
387 *_sources
= &source_pipes
;
388 *_targets
= &target_pipes
;
390 void get_pipes(std::set
<rgw_sync_bucket_pipe
> *sources
, std::set
<rgw_sync_bucket_pipe
> *targets
,
391 std::optional
<rgw_sync_bucket_entity
> filter_peer
);
393 const std::set
<rgw_bucket
>& get_source_hints() const {
397 const std::set
<rgw_bucket
>& get_target_hints() const {
401 bool bucket_exports_data() const;
402 bool bucket_imports_data() const;
404 const rgw_sync_policy_info
& get_sync_policy() const {
408 bool is_legacy_config() const {
409 return legacy_config
;