]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_checkpoint.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_sync_checkpoint.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2020 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include <fmt/format.h>
17#include "common/errno.h"
18#include "rgw_sync_checkpoint.h"
19#include "rgw_sal_rados.h"
20#include "rgw_bucket_sync.h"
21#include "rgw_data_sync.h"
22#include "rgw_http_errors.h"
23#include "cls/rgw/cls_rgw_client.h"
24#include "services/svc_sys_obj.h"
25#include "services/svc_zone.h"
26#include "rgw_zone.h"
27
28#define dout_subsys ceph_subsys_rgw
29
30namespace {
31
32std::string incremental_marker(const rgw_bucket_shard_sync_info& info)
33{
f67539c2
TL
34 return BucketIndexShardsManager::get_shard_marker(info.inc_marker.position);
35}
36
37bool operator<(const std::vector<rgw_bucket_shard_sync_info>& lhs,
38 const BucketIndexShardsManager& rhs)
39{
40 for (size_t i = 0; i < lhs.size(); ++i) {
41 const auto& l = incremental_marker(lhs[i]);
42 const auto& r = rhs.get(i, "");
43 if (l < r) {
44 return true;
45 }
46 }
47 return false;
48}
49
50bool empty(const BucketIndexShardsManager& markers, int size)
51{
52 for (int i = 0; i < size; ++i) {
53 const auto& m = markers.get(i, "");
54 if (!m.empty()) {
55 return false;
56 }
57 }
58 return true;
59}
60
61std::ostream& operator<<(std::ostream& out, const std::vector<rgw_bucket_shard_sync_info>& rhs)
62{
63 const char* separator = ""; // first entry has no comma
64 out << '[';
65 for (auto& i : rhs) {
66 out << std::exchange(separator, ", ") << incremental_marker(i);
67 }
68 return out << ']';
69}
70
71std::ostream& operator<<(std::ostream& out, const BucketIndexShardsManager& rhs)
72{
73 out << '[';
74 const char* separator = ""; // first entry has no comma
75 for (auto& [i, marker] : rhs.get()) {
76 out << std::exchange(separator, ", ") << marker;
77 }
78 return out << ']';
79}
80
81int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp,
20effc67 82 rgw::sal::RadosStore* store,
f67539c2
TL
83 const RGWBucketInfo& bucket_info,
84 const RGWBucketInfo& source_bucket_info,
85 const rgw_sync_bucket_pipe& pipe,
1e59de90 86 uint64_t latest_gen,
f67539c2
TL
87 const BucketIndexShardsManager& remote_markers,
88 ceph::timespan retry_delay,
89 ceph::coarse_mono_time timeout_at)
90{
1e59de90
TL
91
92 const int num_shards = remote_markers.get().size();
93 rgw_bucket_sync_status full_status;
94 int r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
95 if (r < 0 && r != -ENOENT) { // retry on ENOENT
96 return r;
97 }
98
99 // wait for incremental
100 while (full_status.state != BucketSyncState::Incremental) {
101 const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
102 if (delay_until > timeout_at) {
103 lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl;
104 return -ETIMEDOUT;
105 }
106 ldout(store->ctx(), 1) << "waiting to reach incremental sync.." << dendl;
107 std::this_thread::sleep_until(delay_until);
108
109 r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
110 if (r < 0 && r != -ENOENT) { // retry on ENOENT
111 return r;
112 }
113 }
114
115 // wait for latest_gen
116 while (full_status.incremental_gen < latest_gen) {
117 const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
118 if (delay_until > timeout_at) {
119 lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach "
120 "latest generation " << latest_gen << dendl;
121 return -ETIMEDOUT;
122 }
123 ldout(store->ctx(), 1) << "waiting to reach latest gen " << latest_gen
124 << ", on " << full_status.incremental_gen << ".." << dendl;
125 std::this_thread::sleep_until(delay_until);
126
127 r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
128 if (r < 0 && r != -ENOENT) { // retry on ENOENT
129 return r;
130 }
131 }
132
133 if (full_status.incremental_gen > latest_gen) {
134 ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n"
135 << " local gen: " << full_status.incremental_gen << '\n'
136 << " remote gen: " << latest_gen << dendl;
137 return 0;
138 }
f67539c2
TL
139
140 if (empty(remote_markers, num_shards)) {
141 ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
142 return 0;
143 }
144
145 std::vector<rgw_bucket_shard_sync_info> status;
146 status.resize(std::max<size_t>(1, num_shards));
1e59de90
TL
147 r = rgw_read_bucket_inc_sync_status(dpp, store, pipe,
148 full_status.incremental_gen, &status);
f67539c2
TL
149 if (r < 0) {
150 return r;
151 }
152
153 while (status < remote_markers) {
1e59de90 154 const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
f67539c2
TL
155 if (delay_until > timeout_at) {
156 ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl;
157 return -ETIMEDOUT;
158 }
159 ldpp_dout(dpp, 1) << "waiting for incremental sync to catch up:\n"
160 << " local status: " << status << '\n'
161 << " remote markers: " << remote_markers << dendl;
162 std::this_thread::sleep_until(delay_until);
1e59de90
TL
163 r = rgw_read_bucket_inc_sync_status(dpp, store, pipe,
164 full_status.incremental_gen, &status);
f67539c2
TL
165 if (r < 0) {
166 return r;
167 }
168 }
169 ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n"
170 << " local status: " << status << '\n'
171 << " remote markers: " << remote_markers << dendl;
172 return 0;
173}
174
1e59de90
TL
175int source_bilog_info(const DoutPrefixProvider *dpp,
176 RGWSI_Zone* zone_svc,
177 const rgw_sync_bucket_pipe& pipe,
178 rgw_bucket_index_marker_info& info,
179 BucketIndexShardsManager& markers,
180 optional_yield y)
f67539c2
TL
181{
182 ceph_assert(pipe.source.zone);
183
184 auto& zone_conn_map = zone_svc->get_zone_conn_map();
185 auto conn = zone_conn_map.find(pipe.source.zone->id);
186 if (conn == zone_conn_map.end()) {
187 return -EINVAL;
188 }
189
b3b6e05e 190 return rgw_read_remote_bilog_info(dpp, conn->second, *pipe.source.bucket,
1e59de90 191 info, markers, y);
f67539c2
TL
192}
193
194} // anonymous namespace
195
196int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
20effc67 197 rgw::sal::RadosStore* store,
f67539c2
TL
198 const RGWBucketSyncPolicyHandler& policy,
199 const RGWBucketInfo& info,
200 std::optional<rgw_zone_id> opt_source_zone,
201 std::optional<rgw_bucket> opt_source_bucket,
202 ceph::timespan retry_delay,
203 ceph::coarse_mono_time timeout_at)
204{
205 struct sync_source_entry {
206 rgw_sync_bucket_pipe pipe;
1e59de90 207 uint64_t latest_gen = 0;
f67539c2
TL
208 BucketIndexShardsManager remote_markers;
209 RGWBucketInfo source_bucket_info;
210 };
211 std::list<sync_source_entry> sources;
212
213 // fetch remote markers and bucket info in parallel
214 boost::asio::io_context ioctx;
215
216 for (const auto& [source_zone_id, pipe] : policy.get_all_sources()) {
217 // filter by source zone/bucket
218 if (opt_source_zone && *opt_source_zone != *pipe.source.zone) {
219 continue;
220 }
221 if (opt_source_bucket && !opt_source_bucket->match(*pipe.source.bucket)) {
222 continue;
223 }
224 auto& entry = sources.emplace_back();
225 entry.pipe = pipe;
226
227 // fetch remote markers
20effc67 228 spawn::spawn(ioctx, [&] (yield_context yield) {
f67539c2 229 auto y = optional_yield{ioctx, yield};
1e59de90
TL
230 rgw_bucket_index_marker_info info;
231 int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe,
232 info, entry.remote_markers, y);
f67539c2
TL
233 if (r < 0) {
234 ldpp_dout(dpp, 0) << "failed to fetch remote bilog markers: "
235 << cpp_strerror(r) << dendl;
236 throw std::system_error(-r, std::system_category());
237 }
1e59de90 238 entry.latest_gen = info.latest_gen;
f67539c2
TL
239 });
240 // fetch source bucket info
20effc67 241 spawn::spawn(ioctx, [&] (yield_context yield) {
f67539c2 242 auto y = optional_yield{ioctx, yield};
f67539c2 243 int r = store->getRados()->get_bucket_instance_info(
1e59de90 244 *entry.pipe.source.bucket, entry.source_bucket_info,
b3b6e05e 245 nullptr, nullptr, y, dpp);
f67539c2
TL
246 if (r < 0) {
247 ldpp_dout(dpp, 0) << "failed to read source bucket info: "
248 << cpp_strerror(r) << dendl;
249 throw std::system_error(-r, std::system_category());
250 }
251 });
252 }
253
254 try {
255 ioctx.run();
256 } catch (const std::system_error& e) {
257 return -e.code().value();
258 }
259
260 // checkpoint each source sequentially
1e59de90
TL
261 for (const auto& e : sources) {
262 int r = bucket_source_sync_checkpoint(dpp, store, info, e.source_bucket_info,
263 e.pipe, e.latest_gen, e.remote_markers,
f67539c2
TL
264 retry_delay, timeout_at);
265 if (r < 0) {
266 ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl;
267 return r;
268 }
269 }
270 ldpp_dout(dpp, 0) << "bucket checkpoint complete" << dendl;
271 return 0;
272}
273