1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2020 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include <fmt/format.h>
17 #include "common/errno.h"
18 #include "rgw_sync_checkpoint.h"
19 #include "rgw_sal_rados.h"
20 #include "rgw_bucket_sync.h"
21 #include "rgw_data_sync.h"
22 #include "rgw_http_errors.h"
23 #include "cls/rgw/cls_rgw_client.h"
24 #include "services/svc_sys_obj.h"
25 #include "services/svc_zone.h"
28 #define dout_subsys ceph_subsys_rgw
32 std::string
incremental_marker(const rgw_bucket_shard_sync_info
& info
)
34 return BucketIndexShardsManager::get_shard_marker(info
.inc_marker
.position
);
37 bool operator<(const std::vector
<rgw_bucket_shard_sync_info
>& lhs
,
38 const BucketIndexShardsManager
& rhs
)
40 for (size_t i
= 0; i
< lhs
.size(); ++i
) {
41 const auto& l
= incremental_marker(lhs
[i
]);
42 const auto& r
= rhs
.get(i
, "");
50 bool empty(const BucketIndexShardsManager
& markers
, int size
)
52 for (int i
= 0; i
< size
; ++i
) {
53 const auto& m
= markers
.get(i
, "");
61 std::ostream
& operator<<(std::ostream
& out
, const std::vector
<rgw_bucket_shard_sync_info
>& rhs
)
63 const char* separator
= ""; // first entry has no comma
66 out
<< std::exchange(separator
, ", ") << incremental_marker(i
);
71 std::ostream
& operator<<(std::ostream
& out
, const BucketIndexShardsManager
& rhs
)
74 const char* separator
= ""; // first entry has no comma
75 for (auto& [i
, marker
] : rhs
.get()) {
76 out
<< std::exchange(separator
, ", ") << marker
;
81 int bucket_source_sync_checkpoint(const DoutPrefixProvider
* dpp
,
82 rgw::sal::RadosStore
* store
,
83 const RGWBucketInfo
& bucket_info
,
84 const RGWBucketInfo
& source_bucket_info
,
85 const rgw_sync_bucket_pipe
& pipe
,
87 const BucketIndexShardsManager
& remote_markers
,
88 ceph::timespan retry_delay
,
89 ceph::coarse_mono_time timeout_at
)
92 const int num_shards
= remote_markers
.get().size();
93 rgw_bucket_sync_status full_status
;
94 int r
= rgw_read_bucket_full_sync_status(dpp
, store
, pipe
, &full_status
, null_yield
);
95 if (r
< 0 && r
!= -ENOENT
) { // retry on ENOENT
99 // wait for incremental
100 while (full_status
.state
!= BucketSyncState::Incremental
) {
101 const auto delay_until
= ceph::coarse_mono_clock::now() + retry_delay
;
102 if (delay_until
> timeout_at
) {
103 lderr(store
->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl
;
106 ldout(store
->ctx(), 1) << "waiting to reach incremental sync.." << dendl
;
107 std::this_thread::sleep_until(delay_until
);
109 r
= rgw_read_bucket_full_sync_status(dpp
, store
, pipe
, &full_status
, null_yield
);
110 if (r
< 0 && r
!= -ENOENT
) { // retry on ENOENT
115 // wait for latest_gen
116 while (full_status
.incremental_gen
< latest_gen
) {
117 const auto delay_until
= ceph::coarse_mono_clock::now() + retry_delay
;
118 if (delay_until
> timeout_at
) {
119 lderr(store
->ctx()) << "bucket checkpoint timed out waiting to reach "
120 "latest generation " << latest_gen
<< dendl
;
123 ldout(store
->ctx(), 1) << "waiting to reach latest gen " << latest_gen
124 << ", on " << full_status
.incremental_gen
<< ".." << dendl
;
125 std::this_thread::sleep_until(delay_until
);
127 r
= rgw_read_bucket_full_sync_status(dpp
, store
, pipe
, &full_status
, null_yield
);
128 if (r
< 0 && r
!= -ENOENT
) { // retry on ENOENT
133 if (full_status
.incremental_gen
> latest_gen
) {
134 ldpp_dout(dpp
, 1) << "bucket sync caught up with source:\n"
135 << " local gen: " << full_status
.incremental_gen
<< '\n'
136 << " remote gen: " << latest_gen
<< dendl
;
140 if (empty(remote_markers
, num_shards
)) {
141 ldpp_dout(dpp
, 1) << "bucket sync caught up with empty source" << dendl
;
145 std::vector
<rgw_bucket_shard_sync_info
> status
;
146 status
.resize(std::max
<size_t>(1, num_shards
));
147 r
= rgw_read_bucket_inc_sync_status(dpp
, store
, pipe
,
148 full_status
.incremental_gen
, &status
);
153 while (status
< remote_markers
) {
154 const auto delay_until
= ceph::coarse_mono_clock::now() + retry_delay
;
155 if (delay_until
> timeout_at
) {
156 ldpp_dout(dpp
, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl
;
159 ldpp_dout(dpp
, 1) << "waiting for incremental sync to catch up:\n"
160 << " local status: " << status
<< '\n'
161 << " remote markers: " << remote_markers
<< dendl
;
162 std::this_thread::sleep_until(delay_until
);
163 r
= rgw_read_bucket_inc_sync_status(dpp
, store
, pipe
,
164 full_status
.incremental_gen
, &status
);
169 ldpp_dout(dpp
, 1) << "bucket sync caught up with source:\n"
170 << " local status: " << status
<< '\n'
171 << " remote markers: " << remote_markers
<< dendl
;
175 int source_bilog_info(const DoutPrefixProvider
*dpp
,
176 RGWSI_Zone
* zone_svc
,
177 const rgw_sync_bucket_pipe
& pipe
,
178 rgw_bucket_index_marker_info
& info
,
179 BucketIndexShardsManager
& markers
,
182 ceph_assert(pipe
.source
.zone
);
184 auto& zone_conn_map
= zone_svc
->get_zone_conn_map();
185 auto conn
= zone_conn_map
.find(pipe
.source
.zone
->id
);
186 if (conn
== zone_conn_map
.end()) {
190 return rgw_read_remote_bilog_info(dpp
, conn
->second
, *pipe
.source
.bucket
,
194 } // anonymous namespace
196 int rgw_bucket_sync_checkpoint(const DoutPrefixProvider
* dpp
,
197 rgw::sal::RadosStore
* store
,
198 const RGWBucketSyncPolicyHandler
& policy
,
199 const RGWBucketInfo
& info
,
200 std::optional
<rgw_zone_id
> opt_source_zone
,
201 std::optional
<rgw_bucket
> opt_source_bucket
,
202 ceph::timespan retry_delay
,
203 ceph::coarse_mono_time timeout_at
)
205 struct sync_source_entry
{
206 rgw_sync_bucket_pipe pipe
;
207 uint64_t latest_gen
= 0;
208 BucketIndexShardsManager remote_markers
;
209 RGWBucketInfo source_bucket_info
;
211 std::list
<sync_source_entry
> sources
;
213 // fetch remote markers and bucket info in parallel
214 boost::asio::io_context ioctx
;
216 for (const auto& [source_zone_id
, pipe
] : policy
.get_all_sources()) {
217 // filter by source zone/bucket
218 if (opt_source_zone
&& *opt_source_zone
!= *pipe
.source
.zone
) {
221 if (opt_source_bucket
&& !opt_source_bucket
->match(*pipe
.source
.bucket
)) {
224 auto& entry
= sources
.emplace_back();
227 // fetch remote markers
228 spawn::spawn(ioctx
, [&] (yield_context yield
) {
229 auto y
= optional_yield
{ioctx
, yield
};
230 rgw_bucket_index_marker_info info
;
231 int r
= source_bilog_info(dpp
, store
->svc()->zone
, entry
.pipe
,
232 info
, entry
.remote_markers
, y
);
234 ldpp_dout(dpp
, 0) << "failed to fetch remote bilog markers: "
235 << cpp_strerror(r
) << dendl
;
236 throw std::system_error(-r
, std::system_category());
238 entry
.latest_gen
= info
.latest_gen
;
240 // fetch source bucket info
241 spawn::spawn(ioctx
, [&] (yield_context yield
) {
242 auto y
= optional_yield
{ioctx
, yield
};
243 int r
= store
->getRados()->get_bucket_instance_info(
244 *entry
.pipe
.source
.bucket
, entry
.source_bucket_info
,
245 nullptr, nullptr, y
, dpp
);
247 ldpp_dout(dpp
, 0) << "failed to read source bucket info: "
248 << cpp_strerror(r
) << dendl
;
249 throw std::system_error(-r
, std::system_category());
256 } catch (const std::system_error
& e
) {
257 return -e
.code().value();
260 // checkpoint each source sequentially
261 for (const auto& e
: sources
) {
262 int r
= bucket_source_sync_checkpoint(dpp
, store
, info
, e
.source_bucket_info
,
263 e
.pipe
, e
.latest_gen
, e
.remote_markers
,
264 retry_delay
, timeout_at
);
266 ldpp_dout(dpp
, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r
) << dendl
;
270 ldpp_dout(dpp
, 0) << "bucket checkpoint complete" << dendl
;