]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2020 Red Hat, Inc. | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #include <fmt/format.h> | |
17 | #include "common/errno.h" | |
18 | #include "rgw_sync_checkpoint.h" | |
19 | #include "rgw_sal_rados.h" | |
20 | #include "rgw_bucket_sync.h" | |
21 | #include "rgw_data_sync.h" | |
22 | #include "rgw_http_errors.h" | |
23 | #include "cls/rgw/cls_rgw_client.h" | |
24 | #include "services/svc_sys_obj.h" | |
25 | #include "services/svc_zone.h" | |
26 | #include "rgw_zone.h" | |
27 | ||
28 | #define dout_subsys ceph_subsys_rgw | |
29 | ||
30 | namespace { | |
31 | ||
32 | std::string incremental_marker(const rgw_bucket_shard_sync_info& info) | |
33 | { | |
34 | if (info.state != rgw_bucket_shard_sync_info::StateIncrementalSync) { | |
35 | return ""; | |
36 | } | |
37 | return BucketIndexShardsManager::get_shard_marker(info.inc_marker.position); | |
38 | } | |
39 | ||
40 | bool operator<(const std::vector<rgw_bucket_shard_sync_info>& lhs, | |
41 | const BucketIndexShardsManager& rhs) | |
42 | { | |
43 | for (size_t i = 0; i < lhs.size(); ++i) { | |
44 | const auto& l = incremental_marker(lhs[i]); | |
45 | const auto& r = rhs.get(i, ""); | |
46 | if (l < r) { | |
47 | return true; | |
48 | } | |
49 | } | |
50 | return false; | |
51 | } | |
52 | ||
53 | bool empty(const BucketIndexShardsManager& markers, int size) | |
54 | { | |
55 | for (int i = 0; i < size; ++i) { | |
56 | const auto& m = markers.get(i, ""); | |
57 | if (!m.empty()) { | |
58 | return false; | |
59 | } | |
60 | } | |
61 | return true; | |
62 | } | |
63 | ||
64 | std::ostream& operator<<(std::ostream& out, const std::vector<rgw_bucket_shard_sync_info>& rhs) | |
65 | { | |
66 | const char* separator = ""; // first entry has no comma | |
67 | out << '['; | |
68 | for (auto& i : rhs) { | |
69 | out << std::exchange(separator, ", ") << incremental_marker(i); | |
70 | } | |
71 | return out << ']'; | |
72 | } | |
73 | ||
74 | std::ostream& operator<<(std::ostream& out, const BucketIndexShardsManager& rhs) | |
75 | { | |
76 | out << '['; | |
77 | const char* separator = ""; // first entry has no comma | |
78 | for (auto& [i, marker] : rhs.get()) { | |
79 | out << std::exchange(separator, ", ") << marker; | |
80 | } | |
81 | return out << ']'; | |
82 | } | |
83 | ||
84 | int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, | |
85 | rgw::sal::RGWRadosStore *store, | |
86 | const RGWBucketInfo& bucket_info, | |
87 | const RGWBucketInfo& source_bucket_info, | |
88 | const rgw_sync_bucket_pipe& pipe, | |
89 | const BucketIndexShardsManager& remote_markers, | |
90 | ceph::timespan retry_delay, | |
91 | ceph::coarse_mono_time timeout_at) | |
92 | { | |
93 | const auto num_shards = source_bucket_info.layout.current_index.layout.normal.num_shards; | |
94 | ||
95 | if (empty(remote_markers, num_shards)) { | |
96 | ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl; | |
97 | return 0; | |
98 | } | |
99 | ||
100 | std::vector<rgw_bucket_shard_sync_info> status; | |
101 | status.resize(std::max<size_t>(1, num_shards)); | |
102 | int r = rgw_bucket_sync_status(dpp, store, pipe, bucket_info, | |
103 | &source_bucket_info, &status); | |
104 | if (r < 0) { | |
105 | return r; | |
106 | } | |
107 | ||
108 | while (status < remote_markers) { | |
109 | auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; | |
110 | if (delay_until > timeout_at) { | |
111 | ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl; | |
112 | return -ETIMEDOUT; | |
113 | } | |
114 | ldpp_dout(dpp, 1) << "waiting for incremental sync to catch up:\n" | |
115 | << " local status: " << status << '\n' | |
116 | << " remote markers: " << remote_markers << dendl; | |
117 | std::this_thread::sleep_until(delay_until); | |
118 | r = rgw_bucket_sync_status(dpp, store, pipe, bucket_info, &source_bucket_info, &status); | |
119 | if (r < 0) { | |
120 | return r; | |
121 | } | |
122 | } | |
123 | ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n" | |
124 | << " local status: " << status << '\n' | |
125 | << " remote markers: " << remote_markers << dendl; | |
126 | return 0; | |
127 | } | |
128 | ||
129 | int source_bilog_markers(RGWSI_Zone* zone_svc, | |
130 | const rgw_sync_bucket_pipe& pipe, | |
131 | BucketIndexShardsManager& remote_markers, | |
132 | optional_yield y) | |
133 | { | |
134 | ceph_assert(pipe.source.zone); | |
135 | ||
136 | auto& zone_conn_map = zone_svc->get_zone_conn_map(); | |
137 | auto conn = zone_conn_map.find(pipe.source.zone->id); | |
138 | if (conn == zone_conn_map.end()) { | |
139 | return -EINVAL; | |
140 | } | |
141 | ||
142 | return rgw_read_remote_bilog_info(conn->second, *pipe.source.bucket, | |
143 | remote_markers, y); | |
144 | } | |
145 | ||
146 | } // anonymous namespace | |
147 | ||
148 | int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, | |
149 | rgw::sal::RGWRadosStore *store, | |
150 | const RGWBucketSyncPolicyHandler& policy, | |
151 | const RGWBucketInfo& info, | |
152 | std::optional<rgw_zone_id> opt_source_zone, | |
153 | std::optional<rgw_bucket> opt_source_bucket, | |
154 | ceph::timespan retry_delay, | |
155 | ceph::coarse_mono_time timeout_at) | |
156 | { | |
157 | struct sync_source_entry { | |
158 | rgw_sync_bucket_pipe pipe; | |
159 | BucketIndexShardsManager remote_markers; | |
160 | RGWBucketInfo source_bucket_info; | |
161 | }; | |
162 | std::list<sync_source_entry> sources; | |
163 | ||
164 | // fetch remote markers and bucket info in parallel | |
165 | boost::asio::io_context ioctx; | |
166 | ||
167 | for (const auto& [source_zone_id, pipe] : policy.get_all_sources()) { | |
168 | // filter by source zone/bucket | |
169 | if (opt_source_zone && *opt_source_zone != *pipe.source.zone) { | |
170 | continue; | |
171 | } | |
172 | if (opt_source_bucket && !opt_source_bucket->match(*pipe.source.bucket)) { | |
173 | continue; | |
174 | } | |
175 | auto& entry = sources.emplace_back(); | |
176 | entry.pipe = pipe; | |
177 | ||
178 | // fetch remote markers | |
179 | spawn::spawn(ioctx, [&] (spawn::yield_context yield) { | |
180 | auto y = optional_yield{ioctx, yield}; | |
181 | int r = source_bilog_markers(store->svc()->zone, entry.pipe, | |
182 | entry.remote_markers, y); | |
183 | if (r < 0) { | |
184 | ldpp_dout(dpp, 0) << "failed to fetch remote bilog markers: " | |
185 | << cpp_strerror(r) << dendl; | |
186 | throw std::system_error(-r, std::system_category()); | |
187 | } | |
188 | }); | |
189 | // fetch source bucket info | |
190 | spawn::spawn(ioctx, [&] (spawn::yield_context yield) { | |
191 | auto y = optional_yield{ioctx, yield}; | |
192 | auto obj_ctx = store->svc()->sysobj->init_obj_ctx(); | |
193 | int r = store->getRados()->get_bucket_instance_info( | |
194 | obj_ctx, *entry.pipe.source.bucket, entry.source_bucket_info, | |
195 | nullptr, nullptr, y); | |
196 | if (r < 0) { | |
197 | ldpp_dout(dpp, 0) << "failed to read source bucket info: " | |
198 | << cpp_strerror(r) << dendl; | |
199 | throw std::system_error(-r, std::system_category()); | |
200 | } | |
201 | }); | |
202 | } | |
203 | ||
204 | try { | |
205 | ioctx.run(); | |
206 | } catch (const std::system_error& e) { | |
207 | return -e.code().value(); | |
208 | } | |
209 | ||
210 | // checkpoint each source sequentially | |
211 | for (const auto& [pipe, remote_markers, source_bucket_info] : sources) { | |
212 | int r = bucket_source_sync_checkpoint(dpp, store, info, source_bucket_info, | |
213 | pipe, remote_markers, | |
214 | retry_delay, timeout_at); | |
215 | if (r < 0) { | |
216 | ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl; | |
217 | return r; | |
218 | } | |
219 | } | |
220 | ldpp_dout(dpp, 0) << "bucket checkpoint complete" << dendl; | |
221 | return 0; | |
222 | } | |
223 |