]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2020 Red Hat, Inc. | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #include <fmt/format.h> | |
17 | #include "common/errno.h" | |
18 | #include "rgw_sync_checkpoint.h" | |
19 | #include "rgw_sal_rados.h" | |
20 | #include "rgw_bucket_sync.h" | |
21 | #include "rgw_data_sync.h" | |
22 | #include "rgw_http_errors.h" | |
23 | #include "cls/rgw/cls_rgw_client.h" | |
24 | #include "services/svc_sys_obj.h" | |
25 | #include "services/svc_zone.h" | |
26 | #include "rgw_zone.h" | |
27 | ||
28 | #define dout_subsys ceph_subsys_rgw | |
29 | ||
30 | namespace { | |
31 | ||
32 | std::string incremental_marker(const rgw_bucket_shard_sync_info& info) | |
33 | { | |
f67539c2 TL |
34 | return BucketIndexShardsManager::get_shard_marker(info.inc_marker.position); |
35 | } | |
36 | ||
37 | bool operator<(const std::vector<rgw_bucket_shard_sync_info>& lhs, | |
38 | const BucketIndexShardsManager& rhs) | |
39 | { | |
40 | for (size_t i = 0; i < lhs.size(); ++i) { | |
41 | const auto& l = incremental_marker(lhs[i]); | |
42 | const auto& r = rhs.get(i, ""); | |
43 | if (l < r) { | |
44 | return true; | |
45 | } | |
46 | } | |
47 | return false; | |
48 | } | |
49 | ||
50 | bool empty(const BucketIndexShardsManager& markers, int size) | |
51 | { | |
52 | for (int i = 0; i < size; ++i) { | |
53 | const auto& m = markers.get(i, ""); | |
54 | if (!m.empty()) { | |
55 | return false; | |
56 | } | |
57 | } | |
58 | return true; | |
59 | } | |
60 | ||
61 | std::ostream& operator<<(std::ostream& out, const std::vector<rgw_bucket_shard_sync_info>& rhs) | |
62 | { | |
63 | const char* separator = ""; // first entry has no comma | |
64 | out << '['; | |
65 | for (auto& i : rhs) { | |
66 | out << std::exchange(separator, ", ") << incremental_marker(i); | |
67 | } | |
68 | return out << ']'; | |
69 | } | |
70 | ||
71 | std::ostream& operator<<(std::ostream& out, const BucketIndexShardsManager& rhs) | |
72 | { | |
73 | out << '['; | |
74 | const char* separator = ""; // first entry has no comma | |
75 | for (auto& [i, marker] : rhs.get()) { | |
76 | out << std::exchange(separator, ", ") << marker; | |
77 | } | |
78 | return out << ']'; | |
79 | } | |
80 | ||
81 | int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, | |
20effc67 | 82 | rgw::sal::RadosStore* store, |
f67539c2 TL |
83 | const RGWBucketInfo& bucket_info, |
84 | const RGWBucketInfo& source_bucket_info, | |
85 | const rgw_sync_bucket_pipe& pipe, | |
1e59de90 | 86 | uint64_t latest_gen, |
f67539c2 TL |
87 | const BucketIndexShardsManager& remote_markers, |
88 | ceph::timespan retry_delay, | |
89 | ceph::coarse_mono_time timeout_at) | |
90 | { | |
1e59de90 TL |
91 | |
92 | const int num_shards = remote_markers.get().size(); | |
93 | rgw_bucket_sync_status full_status; | |
94 | int r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield); | |
95 | if (r < 0 && r != -ENOENT) { // retry on ENOENT | |
96 | return r; | |
97 | } | |
98 | ||
99 | // wait for incremental | |
100 | while (full_status.state != BucketSyncState::Incremental) { | |
101 | const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; | |
102 | if (delay_until > timeout_at) { | |
103 | lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl; | |
104 | return -ETIMEDOUT; | |
105 | } | |
106 | ldout(store->ctx(), 1) << "waiting to reach incremental sync.." << dendl; | |
107 | std::this_thread::sleep_until(delay_until); | |
108 | ||
109 | r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield); | |
110 | if (r < 0 && r != -ENOENT) { // retry on ENOENT | |
111 | return r; | |
112 | } | |
113 | } | |
114 | ||
115 | // wait for latest_gen | |
116 | while (full_status.incremental_gen < latest_gen) { | |
117 | const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; | |
118 | if (delay_until > timeout_at) { | |
119 | lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach " | |
120 | "latest generation " << latest_gen << dendl; | |
121 | return -ETIMEDOUT; | |
122 | } | |
123 | ldout(store->ctx(), 1) << "waiting to reach latest gen " << latest_gen | |
124 | << ", on " << full_status.incremental_gen << ".." << dendl; | |
125 | std::this_thread::sleep_until(delay_until); | |
126 | ||
127 | r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield); | |
128 | if (r < 0 && r != -ENOENT) { // retry on ENOENT | |
129 | return r; | |
130 | } | |
131 | } | |
132 | ||
133 | if (full_status.incremental_gen > latest_gen) { | |
134 | ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n" | |
135 | << " local gen: " << full_status.incremental_gen << '\n' | |
136 | << " remote gen: " << latest_gen << dendl; | |
137 | return 0; | |
138 | } | |
f67539c2 TL |
139 | |
140 | if (empty(remote_markers, num_shards)) { | |
141 | ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl; | |
142 | return 0; | |
143 | } | |
144 | ||
145 | std::vector<rgw_bucket_shard_sync_info> status; | |
146 | status.resize(std::max<size_t>(1, num_shards)); | |
1e59de90 TL |
147 | r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, |
148 | full_status.incremental_gen, &status); | |
f67539c2 TL |
149 | if (r < 0) { |
150 | return r; | |
151 | } | |
152 | ||
153 | while (status < remote_markers) { | |
1e59de90 | 154 | const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; |
f67539c2 TL |
155 | if (delay_until > timeout_at) { |
156 | ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl; | |
157 | return -ETIMEDOUT; | |
158 | } | |
159 | ldpp_dout(dpp, 1) << "waiting for incremental sync to catch up:\n" | |
160 | << " local status: " << status << '\n' | |
161 | << " remote markers: " << remote_markers << dendl; | |
162 | std::this_thread::sleep_until(delay_until); | |
1e59de90 TL |
163 | r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, |
164 | full_status.incremental_gen, &status); | |
f67539c2 TL |
165 | if (r < 0) { |
166 | return r; | |
167 | } | |
168 | } | |
169 | ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n" | |
170 | << " local status: " << status << '\n' | |
171 | << " remote markers: " << remote_markers << dendl; | |
172 | return 0; | |
173 | } | |
174 | ||
1e59de90 TL |
175 | int source_bilog_info(const DoutPrefixProvider *dpp, |
176 | RGWSI_Zone* zone_svc, | |
177 | const rgw_sync_bucket_pipe& pipe, | |
178 | rgw_bucket_index_marker_info& info, | |
179 | BucketIndexShardsManager& markers, | |
180 | optional_yield y) | |
f67539c2 TL |
181 | { |
182 | ceph_assert(pipe.source.zone); | |
183 | ||
184 | auto& zone_conn_map = zone_svc->get_zone_conn_map(); | |
185 | auto conn = zone_conn_map.find(pipe.source.zone->id); | |
186 | if (conn == zone_conn_map.end()) { | |
187 | return -EINVAL; | |
188 | } | |
189 | ||
b3b6e05e | 190 | return rgw_read_remote_bilog_info(dpp, conn->second, *pipe.source.bucket, |
1e59de90 | 191 | info, markers, y); |
f67539c2 TL |
192 | } |
193 | ||
194 | } // anonymous namespace | |
195 | ||
196 | int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, | |
20effc67 | 197 | rgw::sal::RadosStore* store, |
f67539c2 TL |
198 | const RGWBucketSyncPolicyHandler& policy, |
199 | const RGWBucketInfo& info, | |
200 | std::optional<rgw_zone_id> opt_source_zone, | |
201 | std::optional<rgw_bucket> opt_source_bucket, | |
202 | ceph::timespan retry_delay, | |
203 | ceph::coarse_mono_time timeout_at) | |
204 | { | |
205 | struct sync_source_entry { | |
206 | rgw_sync_bucket_pipe pipe; | |
1e59de90 | 207 | uint64_t latest_gen = 0; |
f67539c2 TL |
208 | BucketIndexShardsManager remote_markers; |
209 | RGWBucketInfo source_bucket_info; | |
210 | }; | |
211 | std::list<sync_source_entry> sources; | |
212 | ||
213 | // fetch remote markers and bucket info in parallel | |
214 | boost::asio::io_context ioctx; | |
215 | ||
216 | for (const auto& [source_zone_id, pipe] : policy.get_all_sources()) { | |
217 | // filter by source zone/bucket | |
218 | if (opt_source_zone && *opt_source_zone != *pipe.source.zone) { | |
219 | continue; | |
220 | } | |
221 | if (opt_source_bucket && !opt_source_bucket->match(*pipe.source.bucket)) { | |
222 | continue; | |
223 | } | |
224 | auto& entry = sources.emplace_back(); | |
225 | entry.pipe = pipe; | |
226 | ||
227 | // fetch remote markers | |
20effc67 | 228 | spawn::spawn(ioctx, [&] (yield_context yield) { |
f67539c2 | 229 | auto y = optional_yield{ioctx, yield}; |
1e59de90 TL |
230 | rgw_bucket_index_marker_info info; |
231 | int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe, | |
232 | info, entry.remote_markers, y); | |
f67539c2 TL |
233 | if (r < 0) { |
234 | ldpp_dout(dpp, 0) << "failed to fetch remote bilog markers: " | |
235 | << cpp_strerror(r) << dendl; | |
236 | throw std::system_error(-r, std::system_category()); | |
237 | } | |
1e59de90 | 238 | entry.latest_gen = info.latest_gen; |
f67539c2 TL |
239 | }); |
240 | // fetch source bucket info | |
20effc67 | 241 | spawn::spawn(ioctx, [&] (yield_context yield) { |
f67539c2 | 242 | auto y = optional_yield{ioctx, yield}; |
f67539c2 | 243 | int r = store->getRados()->get_bucket_instance_info( |
1e59de90 | 244 | *entry.pipe.source.bucket, entry.source_bucket_info, |
b3b6e05e | 245 | nullptr, nullptr, y, dpp); |
f67539c2 TL |
246 | if (r < 0) { |
247 | ldpp_dout(dpp, 0) << "failed to read source bucket info: " | |
248 | << cpp_strerror(r) << dendl; | |
249 | throw std::system_error(-r, std::system_category()); | |
250 | } | |
251 | }); | |
252 | } | |
253 | ||
254 | try { | |
255 | ioctx.run(); | |
256 | } catch (const std::system_error& e) { | |
257 | return -e.code().value(); | |
258 | } | |
259 | ||
260 | // checkpoint each source sequentially | |
1e59de90 TL |
261 | for (const auto& e : sources) { |
262 | int r = bucket_source_sync_checkpoint(dpp, store, info, e.source_bucket_info, | |
263 | e.pipe, e.latest_gen, e.remote_markers, | |
f67539c2 TL |
264 | retry_delay, timeout_at); |
265 | if (r < 0) { | |
266 | ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl; | |
267 | return r; | |
268 | } | |
269 | } | |
270 | ldpp_dout(dpp, 0) << "bucket checkpoint complete" << dendl; | |
271 | return 0; | |
272 | } | |
273 |