]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include <vector> | |
5 | #include <string> | |
6 | ||
f67539c2 TL |
7 | #include "common/errno.h" |
8 | ||
9f95a23c TL |
9 | #include "rgw_trim_datalog.h" |
10 | #include "rgw_cr_rados.h" | |
11 | #include "rgw_cr_rest.h" | |
f67539c2 | 12 | #include "rgw_datalog.h" |
9f95a23c TL |
13 | #include "rgw_data_sync.h" |
14 | #include "rgw_zone.h" | |
15 | #include "rgw_bucket.h" | |
16 | ||
17 | #include "services/svc_zone.h" | |
9f95a23c TL |
18 | |
19 | #include <boost/asio/yield.hpp> | |
20 | ||
21 | #define dout_subsys ceph_subsys_rgw | |
22 | ||
23 | #undef dout_prefix | |
24 | #define dout_prefix (*_dout << "data trim: ") | |
25 | ||
26 | namespace { | |
27 | ||
f67539c2 | 28 | class DatalogTrimImplCR : public RGWSimpleCoroutine { |
b3b6e05e | 29 | const DoutPrefixProvider *dpp; |
20effc67 | 30 | rgw::sal::RadosStore* store; |
f67539c2 TL |
31 | boost::intrusive_ptr<RGWAioCompletionNotifier> cn; |
32 | int shard; | |
33 | std::string marker; | |
34 | std::string* last_trim_marker; | |
35 | ||
36 | public: | |
20effc67 | 37 | DatalogTrimImplCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, int shard, |
f67539c2 | 38 | const std::string& marker, std::string* last_trim_marker) |
b3b6e05e | 39 | : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), shard(shard), |
f67539c2 TL |
40 | marker(marker), last_trim_marker(last_trim_marker) { |
41 | set_description() << "Datalog trim shard=" << shard | |
42 | << " marker=" << marker; | |
43 | } | |
44 | ||
b3b6e05e | 45 | int send_request(const DoutPrefixProvider *dpp) override { |
f67539c2 TL |
46 | set_status() << "sending request"; |
47 | cn = stack->create_completion_notifier(); | |
b3b6e05e | 48 | return store->svc()->datalog_rados->trim_entries(dpp, shard, marker, |
f67539c2 TL |
49 | cn->completion()); |
50 | } | |
51 | int request_complete() override { | |
52 | int r = cn->completion()->get_return_value(); | |
b3b6e05e | 53 | ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "(): trim of shard=" << shard |
f67539c2 TL |
54 | << " marker=" << marker << " returned r=" << r << dendl; |
55 | ||
56 | set_status() << "request complete; ret=" << r; | |
57 | if (r != -ENODATA) { | |
58 | return r; | |
59 | } | |
60 | // nothing left to trim, update last_trim_marker | |
61 | if (*last_trim_marker < marker && | |
62 | marker != store->svc()->datalog_rados->max_marker()) { | |
63 | *last_trim_marker = marker; | |
64 | } | |
65 | return 0; | |
66 | } | |
67 | }; | |
68 | ||
9f95a23c TL |
69 | /// return the marker that it's safe to trim up to |
70 | const std::string& get_stable_marker(const rgw_data_sync_marker& m) | |
71 | { | |
72 | return m.state == m.FullSync ? m.next_step_marker : m.marker; | |
73 | } | |
74 | ||
75 | /// populate the container starting with 'dest' with the minimum stable marker | |
76 | /// of each shard for all of the peers in [first, last) | |
77 | template <typename IterIn, typename IterOut> | |
78 | void take_min_markers(IterIn first, IterIn last, IterOut dest) | |
79 | { | |
80 | if (first == last) { | |
81 | return; | |
82 | } | |
83 | for (auto p = first; p != last; ++p) { | |
84 | auto m = dest; | |
85 | for (auto &shard : p->sync_markers) { | |
86 | const auto& stable = get_stable_marker(shard.second); | |
87 | if (*m > stable) { | |
88 | *m = stable; | |
89 | } | |
90 | ++m; | |
91 | } | |
92 | } | |
93 | } | |
94 | ||
95 | } // anonymous namespace | |
96 | ||
97 | class DataLogTrimCR : public RGWCoroutine { | |
f67539c2 | 98 | using TrimCR = DatalogTrimImplCR; |
b3b6e05e | 99 | const DoutPrefixProvider *dpp; |
20effc67 | 100 | rgw::sal::RadosStore* store; |
9f95a23c TL |
101 | RGWHTTPManager *http; |
102 | const int num_shards; | |
103 | const std::string& zone_id; //< my zone id | |
104 | std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer | |
105 | std::vector<std::string> min_shard_markers; //< min marker per shard | |
106 | std::vector<std::string>& last_trim; //< last trimmed marker per shard | |
107 | int ret{0}; | |
108 | ||
109 | public: | |
20effc67 | 110 | DataLogTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, RGWHTTPManager *http, |
9f95a23c | 111 | int num_shards, std::vector<std::string>& last_trim) |
b3b6e05e | 112 | : RGWCoroutine(store->ctx()), dpp(dpp), store(store), http(http), |
9f95a23c TL |
113 | num_shards(num_shards), |
114 | zone_id(store->svc()->zone->get_zone().id), | |
115 | peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()), | |
f67539c2 TL |
116 | min_shard_markers(num_shards, |
117 | std::string(store->svc()->datalog_rados->max_marker())), | |
9f95a23c TL |
118 | last_trim(last_trim) |
119 | {} | |
120 | ||
b3b6e05e | 121 | int operate(const DoutPrefixProvider *dpp) override; |
9f95a23c TL |
122 | }; |
123 | ||
b3b6e05e | 124 | int DataLogTrimCR::operate(const DoutPrefixProvider *dpp) |
9f95a23c TL |
125 | { |
126 | reenter(this) { | |
b3b6e05e | 127 | ldpp_dout(dpp, 10) << "fetching sync status for zone " << zone_id << dendl; |
9f95a23c TL |
128 | set_status("fetching sync status"); |
129 | yield { | |
130 | // query data sync status from each sync peer | |
131 | rgw_http_param_pair params[] = { | |
132 | { "type", "data" }, | |
133 | { "status", nullptr }, | |
134 | { "source-zone", zone_id.c_str() }, | |
135 | { nullptr, nullptr } | |
136 | }; | |
137 | ||
138 | auto p = peer_status.begin(); | |
139 | for (auto& c : store->svc()->zone->get_zone_data_notify_to_map()) { | |
b3b6e05e | 140 | ldpp_dout(dpp, 20) << "query sync status from " << c.first << dendl; |
9f95a23c TL |
141 | using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>; |
142 | spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), | |
143 | false); | |
144 | ++p; | |
145 | } | |
146 | } | |
147 | ||
148 | // must get a successful reply from all peers to consider trimming | |
149 | ret = 0; | |
150 | while (ret == 0 && num_spawned() > 0) { | |
151 | yield wait_for_child(); | |
152 | collect_next(&ret); | |
153 | } | |
154 | drain_all(); | |
155 | ||
156 | if (ret < 0) { | |
b3b6e05e | 157 | ldpp_dout(dpp, 4) << "failed to fetch sync status from all peers" << dendl; |
9f95a23c TL |
158 | return set_cr_error(ret); |
159 | } | |
160 | ||
b3b6e05e | 161 | ldpp_dout(dpp, 10) << "trimming log shards" << dendl; |
9f95a23c TL |
162 | set_status("trimming log shards"); |
163 | yield { | |
164 | // determine the minimum marker for each shard | |
165 | take_min_markers(peer_status.begin(), peer_status.end(), | |
166 | min_shard_markers.begin()); | |
167 | ||
168 | for (int i = 0; i < num_shards; i++) { | |
169 | const auto& m = min_shard_markers[i]; | |
170 | if (m <= last_trim[i]) { | |
171 | continue; | |
172 | } | |
b3b6e05e | 173 | ldpp_dout(dpp, 10) << "trimming log shard " << i |
9f95a23c TL |
174 | << " at marker=" << m |
175 | << " last_trim=" << last_trim[i] << dendl; | |
b3b6e05e | 176 | spawn(new TrimCR(dpp, store, i, m, &last_trim[i]), |
9f95a23c TL |
177 | true); |
178 | } | |
179 | } | |
180 | return set_cr_done(); | |
181 | } | |
182 | return 0; | |
183 | } | |
184 | ||
20effc67 | 185 | RGWCoroutine* create_admin_data_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, |
9f95a23c TL |
186 | RGWHTTPManager *http, |
187 | int num_shards, | |
188 | std::vector<std::string>& markers) | |
189 | { | |
b3b6e05e | 190 | return new DataLogTrimCR(dpp, store, http, num_shards, markers); |
9f95a23c TL |
191 | } |
192 | ||
193 | class DataLogTrimPollCR : public RGWCoroutine { | |
b3b6e05e | 194 | const DoutPrefixProvider *dpp; |
20effc67 | 195 | rgw::sal::RadosStore* store; |
9f95a23c TL |
196 | RGWHTTPManager *http; |
197 | const int num_shards; | |
198 | const utime_t interval; //< polling interval | |
199 | const std::string lock_oid; //< use first data log shard for lock | |
200 | const std::string lock_cookie; | |
201 | std::vector<std::string> last_trim; //< last trimmed marker per shard | |
202 | ||
203 | public: | |
20effc67 | 204 | DataLogTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, RGWHTTPManager *http, |
9f95a23c | 205 | int num_shards, utime_t interval) |
b3b6e05e | 206 | : RGWCoroutine(store->ctx()), dpp(dpp), store(store), http(http), |
9f95a23c | 207 | num_shards(num_shards), interval(interval), |
f67539c2 | 208 | lock_oid(store->svc()->datalog_rados->get_oid(0, 0)), |
9f95a23c TL |
209 | lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), |
210 | last_trim(num_shards) | |
211 | {} | |
212 | ||
b3b6e05e | 213 | int operate(const DoutPrefixProvider *dpp) override; |
9f95a23c TL |
214 | }; |
215 | ||
b3b6e05e | 216 | int DataLogTrimPollCR::operate(const DoutPrefixProvider *dpp) |
9f95a23c TL |
217 | { |
218 | reenter(this) { | |
219 | for (;;) { | |
220 | set_status("sleeping"); | |
221 | wait(interval); | |
222 | ||
223 | // request a 'data_trim' lock that covers the entire wait interval to | |
224 | // prevent other gateways from attempting to trim for the duration | |
225 | set_status("acquiring trim lock"); | |
226 | yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store, | |
227 | rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, lock_oid), | |
228 | "data_trim", lock_cookie, | |
229 | interval.sec())); | |
230 | if (retcode < 0) { | |
231 | // if the lock is already held, go back to sleep and try again later | |
b3b6e05e | 232 | ldpp_dout(dpp, 4) << "failed to lock " << lock_oid << ", trying again in " |
9f95a23c TL |
233 | << interval.sec() << "s" << dendl; |
234 | continue; | |
235 | } | |
236 | ||
237 | set_status("trimming"); | |
b3b6e05e | 238 | yield call(new DataLogTrimCR(dpp, store, http, num_shards, last_trim)); |
9f95a23c TL |
239 | |
240 | // note that the lock is not released. this is intentional, as it avoids | |
241 | // duplicating this work in other gateways | |
242 | } | |
243 | } | |
244 | return 0; | |
245 | } | |
246 | ||
20effc67 | 247 | RGWCoroutine* create_data_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, |
9f95a23c TL |
248 | RGWHTTPManager *http, |
249 | int num_shards, utime_t interval) | |
250 | { | |
b3b6e05e | 251 | return new DataLogTrimPollCR(dpp, store, http, num_shards, interval); |
9f95a23c | 252 | } |