1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2020 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include <fmt/format.h>
17 #include "common/errno.h"
18 #include "rgw_sync_checkpoint.h"
19 #include "rgw_sal_rados.h"
20 #include "rgw_bucket_sync.h"
21 #include "rgw_data_sync.h"
22 #include "rgw_http_errors.h"
23 #include "cls/rgw/cls_rgw_client.h"
24 #include "services/svc_sys_obj.h"
25 #include "services/svc_zone.h"
28 #define dout_subsys ceph_subsys_rgw
32 std::string
incremental_marker(const rgw_bucket_shard_sync_info
& info
)
34 if (info
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
) {
37 return BucketIndexShardsManager::get_shard_marker(info
.inc_marker
.position
);
40 bool operator<(const std::vector
<rgw_bucket_shard_sync_info
>& lhs
,
41 const BucketIndexShardsManager
& rhs
)
43 for (size_t i
= 0; i
< lhs
.size(); ++i
) {
44 const auto& l
= incremental_marker(lhs
[i
]);
45 const auto& r
= rhs
.get(i
, "");
53 bool empty(const BucketIndexShardsManager
& markers
, int size
)
55 for (int i
= 0; i
< size
; ++i
) {
56 const auto& m
= markers
.get(i
, "");
64 std::ostream
& operator<<(std::ostream
& out
, const std::vector
<rgw_bucket_shard_sync_info
>& rhs
)
66 const char* separator
= ""; // first entry has no comma
69 out
<< std::exchange(separator
, ", ") << incremental_marker(i
);
74 std::ostream
& operator<<(std::ostream
& out
, const BucketIndexShardsManager
& rhs
)
77 const char* separator
= ""; // first entry has no comma
78 for (auto& [i
, marker
] : rhs
.get()) {
79 out
<< std::exchange(separator
, ", ") << marker
;
84 int bucket_source_sync_checkpoint(const DoutPrefixProvider
* dpp
,
85 rgw::sal::RGWRadosStore
*store
,
86 const RGWBucketInfo
& bucket_info
,
87 const RGWBucketInfo
& source_bucket_info
,
88 const rgw_sync_bucket_pipe
& pipe
,
89 const BucketIndexShardsManager
& remote_markers
,
90 ceph::timespan retry_delay
,
91 ceph::coarse_mono_time timeout_at
)
93 const auto num_shards
= source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
;
95 if (empty(remote_markers
, num_shards
)) {
96 ldpp_dout(dpp
, 1) << "bucket sync caught up with empty source" << dendl
;
100 std::vector
<rgw_bucket_shard_sync_info
> status
;
101 status
.resize(std::max
<size_t>(1, num_shards
));
102 int r
= rgw_bucket_sync_status(dpp
, store
, pipe
, bucket_info
,
103 &source_bucket_info
, &status
);
108 while (status
< remote_markers
) {
109 auto delay_until
= ceph::coarse_mono_clock::now() + retry_delay
;
110 if (delay_until
> timeout_at
) {
111 ldpp_dout(dpp
, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl
;
114 ldpp_dout(dpp
, 1) << "waiting for incremental sync to catch up:\n"
115 << " local status: " << status
<< '\n'
116 << " remote markers: " << remote_markers
<< dendl
;
117 std::this_thread::sleep_until(delay_until
);
118 r
= rgw_bucket_sync_status(dpp
, store
, pipe
, bucket_info
, &source_bucket_info
, &status
);
123 ldpp_dout(dpp
, 1) << "bucket sync caught up with source:\n"
124 << " local status: " << status
<< '\n'
125 << " remote markers: " << remote_markers
<< dendl
;
129 int source_bilog_markers(const DoutPrefixProvider
*dpp
,
130 RGWSI_Zone
* zone_svc
,
131 const rgw_sync_bucket_pipe
& pipe
,
132 BucketIndexShardsManager
& remote_markers
,
135 ceph_assert(pipe
.source
.zone
);
137 auto& zone_conn_map
= zone_svc
->get_zone_conn_map();
138 auto conn
= zone_conn_map
.find(pipe
.source
.zone
->id
);
139 if (conn
== zone_conn_map
.end()) {
143 return rgw_read_remote_bilog_info(dpp
, conn
->second
, *pipe
.source
.bucket
,
147 } // anonymous namespace
149 int rgw_bucket_sync_checkpoint(const DoutPrefixProvider
* dpp
,
150 rgw::sal::RGWRadosStore
*store
,
151 const RGWBucketSyncPolicyHandler
& policy
,
152 const RGWBucketInfo
& info
,
153 std::optional
<rgw_zone_id
> opt_source_zone
,
154 std::optional
<rgw_bucket
> opt_source_bucket
,
155 ceph::timespan retry_delay
,
156 ceph::coarse_mono_time timeout_at
)
158 struct sync_source_entry
{
159 rgw_sync_bucket_pipe pipe
;
160 BucketIndexShardsManager remote_markers
;
161 RGWBucketInfo source_bucket_info
;
163 std::list
<sync_source_entry
> sources
;
165 // fetch remote markers and bucket info in parallel
166 boost::asio::io_context ioctx
;
168 for (const auto& [source_zone_id
, pipe
] : policy
.get_all_sources()) {
169 // filter by source zone/bucket
170 if (opt_source_zone
&& *opt_source_zone
!= *pipe
.source
.zone
) {
173 if (opt_source_bucket
&& !opt_source_bucket
->match(*pipe
.source
.bucket
)) {
176 auto& entry
= sources
.emplace_back();
179 // fetch remote markers
180 spawn::spawn(ioctx
, [&] (spawn::yield_context yield
) {
181 auto y
= optional_yield
{ioctx
, yield
};
182 int r
= source_bilog_markers(dpp
, store
->svc()->zone
, entry
.pipe
,
183 entry
.remote_markers
, y
);
185 ldpp_dout(dpp
, 0) << "failed to fetch remote bilog markers: "
186 << cpp_strerror(r
) << dendl
;
187 throw std::system_error(-r
, std::system_category());
190 // fetch source bucket info
191 spawn::spawn(ioctx
, [&] (spawn::yield_context yield
) {
192 auto y
= optional_yield
{ioctx
, yield
};
193 auto obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
194 int r
= store
->getRados()->get_bucket_instance_info(
195 obj_ctx
, *entry
.pipe
.source
.bucket
, entry
.source_bucket_info
,
196 nullptr, nullptr, y
, dpp
);
198 ldpp_dout(dpp
, 0) << "failed to read source bucket info: "
199 << cpp_strerror(r
) << dendl
;
200 throw std::system_error(-r
, std::system_category());
207 } catch (const std::system_error
& e
) {
208 return -e
.code().value();
211 // checkpoint each source sequentially
212 for (const auto& [pipe
, remote_markers
, source_bucket_info
] : sources
) {
213 int r
= bucket_source_sync_checkpoint(dpp
, store
, info
, source_bucket_info
,
214 pipe
, remote_markers
,
215 retry_delay
, timeout_at
);
217 ldpp_dout(dpp
, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r
) << dendl
;
221 ldpp_dout(dpp
, 0) << "bucket checkpoint complete" << dendl
;