]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "common/errno.h" | |
5 | ||
6 | #include "rgw_trim_mdlog.h" | |
7 | #include "rgw_sync.h" | |
8 | #include "rgw_cr_rados.h" | |
9 | #include "rgw_cr_rest.h" | |
9f95a23c TL |
10 | #include "rgw_zone.h" |
11 | #include "services/svc_zone.h" | |
12 | #include "services/svc_meta.h" | |
13 | #include "services/svc_mdlog.h" | |
14 | #include "services/svc_cls.h" | |
15 | ||
16 | #include <boost/asio/yield.hpp> | |
17 | ||
18 | #define dout_subsys ceph_subsys_rgw | |
19 | ||
20 | #undef dout_prefix | |
21 | #define dout_prefix (*_dout << "meta trim: ") | |
22 | ||
23 | /// purge all log shards for the given mdlog | |
24 | class PurgeLogShardsCR : public RGWShardCollectCR { | |
25 | rgw::sal::RGWRadosStore *const store; | |
26 | const RGWMetadataLog* mdlog; | |
27 | const int num_shards; | |
28 | rgw_raw_obj obj; | |
29 | int i{0}; | |
30 | ||
31 | static constexpr int max_concurrent = 16; | |
32 | ||
33 | public: | |
34 | PurgeLogShardsCR(rgw::sal::RGWRadosStore *store, const RGWMetadataLog* mdlog, | |
35 | const rgw_pool& pool, int num_shards) | |
36 | : RGWShardCollectCR(store->ctx(), max_concurrent), | |
37 | store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "") | |
38 | {} | |
39 | ||
40 | bool spawn_next() override { | |
41 | if (i == num_shards) { | |
42 | return false; | |
43 | } | |
44 | mdlog->get_shard_oid(i++, obj.oid); | |
45 | spawn(new RGWRadosRemoveCR(store, obj), false); | |
46 | return true; | |
47 | } | |
48 | }; | |
49 | ||
50 | using Cursor = RGWPeriodHistory::Cursor; | |
51 | ||
52 | /// purge mdlogs from the oldest up to (but not including) the given realm_epoch | |
53 | class PurgePeriodLogsCR : public RGWCoroutine { | |
54 | struct Svc { | |
55 | RGWSI_Zone *zone; | |
56 | RGWSI_MDLog *mdlog; | |
57 | } svc; | |
58 | rgw::sal::RGWRadosStore *const store; | |
59 | RGWMetadataManager *const metadata; | |
60 | RGWObjVersionTracker objv; | |
61 | Cursor cursor; | |
62 | epoch_t realm_epoch; | |
63 | epoch_t *last_trim_epoch; //< update last trim on success | |
64 | ||
65 | public: | |
66 | PurgePeriodLogsCR(rgw::sal::RGWRadosStore *store, epoch_t realm_epoch, epoch_t *last_trim) | |
67 | : RGWCoroutine(store->ctx()), store(store), metadata(store->ctl()->meta.mgr), | |
68 | realm_epoch(realm_epoch), last_trim_epoch(last_trim) { | |
69 | svc.zone = store->svc()->zone; | |
70 | svc.mdlog = store->svc()->mdlog; | |
71 | } | |
72 | ||
73 | int operate() override; | |
74 | }; | |
75 | ||
76 | int PurgePeriodLogsCR::operate() | |
77 | { | |
78 | reenter(this) { | |
79 | // read our current oldest log period | |
80 | yield call(svc.mdlog->read_oldest_log_period_cr(&cursor, &objv)); | |
81 | if (retcode < 0) { | |
82 | return set_cr_error(retcode); | |
83 | } | |
84 | ceph_assert(cursor); | |
85 | ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch() | |
86 | << " period=" << cursor.get_period().get_id() << dendl; | |
87 | ||
88 | // trim -up to- the given realm_epoch | |
89 | while (cursor.get_epoch() < realm_epoch) { | |
90 | ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch() | |
91 | << " period=" << cursor.get_period().get_id() << dendl; | |
92 | yield { | |
93 | const auto mdlog = svc.mdlog->get_log(cursor.get_period().get_id()); | |
94 | const auto& pool = svc.zone->get_zone_params().log_pool; | |
95 | auto num_shards = cct->_conf->rgw_md_log_max_shards; | |
96 | call(new PurgeLogShardsCR(store, mdlog, pool, num_shards)); | |
97 | } | |
98 | if (retcode < 0) { | |
99 | ldout(cct, 1) << "failed to remove log shards: " | |
100 | << cpp_strerror(retcode) << dendl; | |
101 | return set_cr_error(retcode); | |
102 | } | |
103 | ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch() | |
104 | << " period=" << cursor.get_period().get_id() << dendl; | |
105 | ||
106 | // update our mdlog history | |
107 | yield call(svc.mdlog->trim_log_period_cr(cursor, &objv)); | |
108 | if (retcode == -ENOENT) { | |
109 | // must have raced to update mdlog history. return success and allow the | |
110 | // winner to continue purging | |
111 | ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch() | |
112 | << " period=" << cursor.get_period().get_id() << dendl; | |
113 | return set_cr_done(); | |
114 | } else if (retcode < 0) { | |
115 | ldout(cct, 1) << "failed to remove log shards for realm_epoch=" | |
116 | << cursor.get_epoch() << " period=" << cursor.get_period().get_id() | |
117 | << " with: " << cpp_strerror(retcode) << dendl; | |
118 | return set_cr_error(retcode); | |
119 | } | |
120 | ||
121 | if (*last_trim_epoch < cursor.get_epoch()) { | |
122 | *last_trim_epoch = cursor.get_epoch(); | |
123 | } | |
124 | ||
125 | ceph_assert(cursor.has_next()); // get_current() should always come after | |
126 | cursor.next(); | |
127 | } | |
128 | return set_cr_done(); | |
129 | } | |
130 | return 0; | |
131 | } | |
132 | ||
133 | namespace { | |
134 | ||
135 | using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>; | |
136 | ||
137 | /// construct a RGWRESTConn for each zone in the realm | |
138 | template <typename Zonegroups> | |
139 | connection_map make_peer_connections(rgw::sal::RGWRadosStore *store, | |
140 | const Zonegroups& zonegroups) | |
141 | { | |
142 | connection_map connections; | |
143 | for (auto& g : zonegroups) { | |
144 | for (auto& z : g.second.zones) { | |
145 | std::unique_ptr<RGWRESTConn> conn{ | |
146 | new RGWRESTConn(store->ctx(), store->svc()->zone, z.first.id, z.second.endpoints)}; | |
147 | connections.emplace(z.first.id, std::move(conn)); | |
148 | } | |
149 | } | |
150 | return connections; | |
151 | } | |
152 | ||
153 | /// return the marker that it's safe to trim up to | |
154 | const std::string& get_stable_marker(const rgw_meta_sync_marker& m) | |
155 | { | |
156 | return m.state == m.FullSync ? m.next_step_marker : m.marker; | |
157 | } | |
158 | ||
159 | /// comparison operator for take_min_status() | |
160 | bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs) | |
161 | { | |
162 | // sort by stable marker | |
163 | return get_stable_marker(lhs) < get_stable_marker(rhs); | |
164 | } | |
165 | ||
166 | /// populate the status with the minimum stable marker of each shard for any | |
167 | /// peer whose realm_epoch matches the minimum realm_epoch in the input | |
168 | template <typename Iter> | |
169 | int take_min_status(CephContext *cct, Iter first, Iter last, | |
170 | rgw_meta_sync_status *status) | |
171 | { | |
172 | if (first == last) { | |
173 | return -EINVAL; | |
174 | } | |
175 | const size_t num_shards = cct->_conf->rgw_md_log_max_shards; | |
176 | ||
177 | status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max(); | |
178 | for (auto p = first; p != last; ++p) { | |
179 | // validate peer's shard count | |
180 | if (p->sync_markers.size() != num_shards) { | |
181 | ldout(cct, 1) << "take_min_status got peer status with " | |
182 | << p->sync_markers.size() << " shards, expected " | |
183 | << num_shards << dendl; | |
184 | return -EINVAL; | |
185 | } | |
186 | if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) { | |
187 | // earlier epoch, take its entire status | |
188 | *status = std::move(*p); | |
189 | } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) { | |
190 | // same epoch, take any earlier markers | |
191 | auto m = status->sync_markers.begin(); | |
192 | for (auto& shard : p->sync_markers) { | |
193 | if (shard.second < m->second) { | |
194 | m->second = std::move(shard.second); | |
195 | } | |
196 | ++m; | |
197 | } | |
198 | } | |
199 | } | |
200 | return 0; | |
201 | } | |
202 | ||
203 | struct TrimEnv { | |
204 | const DoutPrefixProvider *dpp; | |
205 | rgw::sal::RGWRadosStore *const store; | |
206 | RGWHTTPManager *const http; | |
207 | int num_shards; | |
208 | const rgw_zone_id& zone; | |
209 | Cursor current; //< cursor to current period | |
210 | epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged | |
211 | ||
212 | TrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards) | |
213 | : dpp(dpp), store(store), http(http), num_shards(num_shards), | |
214 | zone(store->svc()->zone->zone_id()), | |
215 | current(store->svc()->mdlog->get_period_history()->get_current()) | |
216 | {} | |
217 | }; | |
218 | ||
219 | struct MasterTrimEnv : public TrimEnv { | |
220 | connection_map connections; //< peer connections | |
221 | std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer | |
222 | /// last trim marker for each shard, only applies to current period's mdlog | |
223 | std::vector<std::string> last_trim_markers; | |
224 | ||
225 | MasterTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards) | |
226 | : TrimEnv(dpp, store, http, num_shards), | |
227 | last_trim_markers(num_shards) | |
228 | { | |
229 | auto& period = current.get_period(); | |
230 | connections = make_peer_connections(store, period.get_map().zonegroups); | |
231 | connections.erase(zone.id); | |
232 | peer_status.resize(connections.size()); | |
233 | } | |
234 | }; | |
235 | ||
236 | struct PeerTrimEnv : public TrimEnv { | |
237 | /// last trim timestamp for each shard, only applies to current period's mdlog | |
238 | std::vector<ceph::real_time> last_trim_timestamps; | |
239 | ||
240 | PeerTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards) | |
241 | : TrimEnv(dpp, store, http, num_shards), | |
242 | last_trim_timestamps(num_shards) | |
243 | {} | |
244 | ||
245 | void set_num_shards(int num_shards) { | |
246 | this->num_shards = num_shards; | |
247 | last_trim_timestamps.resize(num_shards); | |
248 | } | |
249 | }; | |
250 | ||
251 | } // anonymous namespace | |
252 | ||
253 | ||
254 | /// spawn a trim cr for each shard that needs it, while limiting the number | |
255 | /// of concurrent shards | |
256 | class MetaMasterTrimShardCollectCR : public RGWShardCollectCR { | |
257 | private: | |
258 | static constexpr int MAX_CONCURRENT_SHARDS = 16; | |
259 | ||
260 | MasterTrimEnv& env; | |
261 | RGWMetadataLog *mdlog; | |
262 | int shard_id{0}; | |
263 | std::string oid; | |
264 | const rgw_meta_sync_status& sync_status; | |
265 | ||
266 | public: | |
267 | MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog, | |
268 | const rgw_meta_sync_status& sync_status) | |
269 | : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), | |
270 | env(env), mdlog(mdlog), sync_status(sync_status) | |
271 | {} | |
272 | ||
273 | bool spawn_next() override; | |
274 | }; | |
275 | ||
276 | bool MetaMasterTrimShardCollectCR::spawn_next() | |
277 | { | |
278 | while (shard_id < env.num_shards) { | |
279 | auto m = sync_status.sync_markers.find(shard_id); | |
280 | if (m == sync_status.sync_markers.end()) { | |
281 | shard_id++; | |
282 | continue; | |
283 | } | |
284 | auto& stable = get_stable_marker(m->second); | |
285 | auto& last_trim = env.last_trim_markers[shard_id]; | |
286 | ||
287 | if (stable <= last_trim) { | |
288 | // already trimmed | |
289 | ldout(cct, 20) << "skipping log shard " << shard_id | |
290 | << " at marker=" << stable | |
291 | << " last_trim=" << last_trim | |
292 | << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; | |
293 | shard_id++; | |
294 | continue; | |
295 | } | |
296 | ||
297 | mdlog->get_shard_oid(shard_id, oid); | |
298 | ||
299 | ldout(cct, 10) << "trimming log shard " << shard_id | |
300 | << " at marker=" << stable | |
301 | << " last_trim=" << last_trim | |
302 | << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; | |
303 | spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false); | |
304 | shard_id++; | |
305 | return true; | |
306 | } | |
307 | return false; | |
308 | } | |
309 | ||
310 | /// spawn rest requests to read each peer's sync status | |
311 | class MetaMasterStatusCollectCR : public RGWShardCollectCR { | |
312 | static constexpr int MAX_CONCURRENT_SHARDS = 16; | |
313 | ||
314 | MasterTrimEnv& env; | |
315 | connection_map::iterator c; | |
316 | std::vector<rgw_meta_sync_status>::iterator s; | |
317 | public: | |
318 | explicit MetaMasterStatusCollectCR(MasterTrimEnv& env) | |
319 | : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), | |
320 | env(env), c(env.connections.begin()), s(env.peer_status.begin()) | |
321 | {} | |
322 | ||
323 | bool spawn_next() override { | |
324 | if (c == env.connections.end()) { | |
325 | return false; | |
326 | } | |
327 | static rgw_http_param_pair params[] = { | |
328 | { "type", "metadata" }, | |
329 | { "status", nullptr }, | |
330 | { nullptr, nullptr } | |
331 | }; | |
332 | ||
333 | ldout(cct, 20) << "query sync status from " << c->first << dendl; | |
334 | auto conn = c->second.get(); | |
335 | using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>; | |
336 | spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s), | |
337 | false); | |
338 | ++c; | |
339 | ++s; | |
340 | return true; | |
341 | } | |
342 | }; | |
343 | ||
344 | class MetaMasterTrimCR : public RGWCoroutine { | |
345 | MasterTrimEnv& env; | |
346 | rgw_meta_sync_status min_status; //< minimum sync status of all peers | |
347 | int ret{0}; | |
348 | ||
349 | public: | |
350 | explicit MetaMasterTrimCR(MasterTrimEnv& env) | |
351 | : RGWCoroutine(env.store->ctx()), env(env) | |
352 | {} | |
353 | ||
354 | int operate() override; | |
355 | }; | |
356 | ||
357 | int MetaMasterTrimCR::operate() | |
358 | { | |
359 | reenter(this) { | |
360 | // TODO: detect this and fail before we spawn the trim thread? | |
361 | if (env.connections.empty()) { | |
362 | ldout(cct, 4) << "no peers, exiting" << dendl; | |
363 | return set_cr_done(); | |
364 | } | |
365 | ||
366 | ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl; | |
367 | // query mdlog sync status from peers | |
368 | yield call(new MetaMasterStatusCollectCR(env)); | |
369 | ||
370 | // must get a successful reply from all peers to consider trimming | |
371 | if (ret < 0) { | |
372 | ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; | |
373 | return set_cr_error(ret); | |
374 | } | |
375 | ||
376 | // determine the minimum epoch and markers | |
377 | ret = take_min_status(env.store->ctx(), env.peer_status.begin(), | |
378 | env.peer_status.end(), &min_status); | |
379 | if (ret < 0) { | |
380 | ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl; | |
381 | return set_cr_error(ret); | |
382 | } | |
383 | yield { | |
384 | auto store = env.store; | |
385 | auto epoch = min_status.sync_info.realm_epoch; | |
386 | ldout(cct, 4) << "realm epoch min=" << epoch | |
387 | << " current=" << env.current.get_epoch()<< dendl; | |
388 | if (epoch > env.last_trim_epoch + 1) { | |
389 | // delete any prior mdlog periods | |
390 | spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true); | |
391 | } else { | |
392 | ldout(cct, 10) << "mdlogs already purged up to realm_epoch " | |
393 | << env.last_trim_epoch << dendl; | |
394 | } | |
395 | ||
396 | // if realm_epoch == current, trim mdlog based on markers | |
397 | if (epoch == env.current.get_epoch()) { | |
398 | auto mdlog = store->svc()->mdlog->get_log(env.current.get_period().get_id()); | |
399 | spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true); | |
400 | } | |
401 | } | |
402 | // ignore any errors during purge/trim because we want to hold the lock open | |
403 | return set_cr_done(); | |
404 | } | |
405 | return 0; | |
406 | } | |
407 | ||
408 | ||
409 | /// read the first entry of the master's mdlog shard and trim to that position | |
410 | class MetaPeerTrimShardCR : public RGWCoroutine { | |
411 | RGWMetaSyncEnv& env; | |
412 | RGWMetadataLog *mdlog; | |
413 | const std::string& period_id; | |
414 | const int shard_id; | |
415 | RGWMetadataLogInfo info; | |
416 | ceph::real_time stable; //< safe timestamp to trim, according to master | |
417 | ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim | |
418 | rgw_mdlog_shard_data result; //< result from master's mdlog listing | |
419 | ||
420 | public: | |
421 | MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog, | |
422 | const std::string& period_id, int shard_id, | |
423 | ceph::real_time *last_trim) | |
424 | : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog), | |
425 | period_id(period_id), shard_id(shard_id), last_trim(last_trim) | |
426 | {} | |
427 | ||
428 | int operate() override; | |
429 | }; | |
430 | ||
431 | int MetaPeerTrimShardCR::operate() | |
432 | { | |
433 | reenter(this) { | |
434 | // query master's first mdlog entry for this shard | |
435 | yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id, | |
436 | "", 1, &result)); | |
437 | if (retcode < 0) { | |
438 | ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard " | |
439 | << shard_id << " for period " << period_id | |
440 | << ": " << cpp_strerror(retcode) << dendl; | |
441 | return set_cr_error(retcode); | |
442 | } | |
443 | if (result.entries.empty()) { | |
444 | // if there are no mdlog entries, we don't have a timestamp to compare. we | |
445 | // can't just trim everything, because there could be racing updates since | |
446 | // this empty reply. query the mdlog shard info to read its max timestamp, | |
447 | // then retry the listing to make sure it's still empty before trimming to | |
448 | // that | |
449 | ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id | |
450 | << ", reading last timestamp from shard info" << dendl; | |
451 | // read the mdlog shard info for the last timestamp | |
452 | yield call(create_read_remote_mdlog_shard_info_cr(&env, period_id, shard_id, &info)); | |
453 | if (retcode < 0) { | |
454 | ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard " | |
455 | << shard_id << " for period " << period_id | |
456 | << ": " << cpp_strerror(retcode) << dendl; | |
457 | return set_cr_error(retcode); | |
458 | } | |
459 | if (ceph::real_clock::is_zero(info.last_update)) { | |
460 | return set_cr_done(); // nothing to trim | |
461 | } | |
462 | ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update=" | |
463 | << info.last_update << dendl; | |
464 | // re-read the master's first mdlog entry to make sure it hasn't changed | |
465 | yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id, | |
466 | "", 1, &result)); | |
467 | if (retcode < 0) { | |
468 | ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard " | |
469 | << shard_id << " for period " << period_id | |
470 | << ": " << cpp_strerror(retcode) << dendl; | |
471 | return set_cr_error(retcode); | |
472 | } | |
473 | // if the mdlog is still empty, trim to max marker | |
474 | if (result.entries.empty()) { | |
475 | stable = info.last_update; | |
476 | } else { | |
477 | stable = result.entries.front().timestamp; | |
478 | ||
479 | // can only trim -up to- master's first timestamp, so subtract a second. | |
480 | // (this is why we use timestamps instead of markers for the peers) | |
481 | stable -= std::chrono::seconds(1); | |
482 | } | |
483 | } else { | |
484 | stable = result.entries.front().timestamp; | |
485 | stable -= std::chrono::seconds(1); | |
486 | } | |
487 | ||
488 | if (stable <= *last_trim) { | |
489 | ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id | |
490 | << " at timestamp=" << stable | |
491 | << " last_trim=" << *last_trim << dendl; | |
492 | return set_cr_done(); | |
493 | } | |
494 | ||
495 | ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id | |
496 | << " at timestamp=" << stable | |
497 | << " last_trim=" << *last_trim << dendl; | |
498 | yield { | |
499 | std::string oid; | |
500 | mdlog->get_shard_oid(shard_id, oid); | |
501 | call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", "")); | |
502 | } | |
503 | if (retcode < 0 && retcode != -ENODATA) { | |
504 | ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id | |
505 | << ": " << cpp_strerror(retcode) << dendl; | |
506 | return set_cr_error(retcode); | |
507 | } | |
508 | *last_trim = stable; | |
509 | return set_cr_done(); | |
510 | } | |
511 | return 0; | |
512 | } | |
513 | ||
514 | class MetaPeerTrimShardCollectCR : public RGWShardCollectCR { | |
515 | static constexpr int MAX_CONCURRENT_SHARDS = 16; | |
516 | ||
517 | PeerTrimEnv& env; | |
518 | RGWMetadataLog *mdlog; | |
519 | const std::string& period_id; | |
520 | RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR | |
521 | int shard_id{0}; | |
522 | ||
523 | public: | |
524 | MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog) | |
525 | : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), | |
526 | env(env), mdlog(mdlog), period_id(env.current.get_period().get_id()) | |
527 | { | |
528 | meta_env.init(env.dpp, cct, env.store, env.store->svc()->zone->get_master_conn(), | |
529 | env.store->svc()->rados->get_async_processor(), env.http, nullptr, | |
530 | env.store->getRados()->get_sync_tracer()); | |
531 | } | |
532 | ||
533 | bool spawn_next() override; | |
534 | }; | |
535 | ||
536 | bool MetaPeerTrimShardCollectCR::spawn_next() | |
537 | { | |
538 | if (shard_id >= env.num_shards) { | |
539 | return false; | |
540 | } | |
541 | auto& last_trim = env.last_trim_timestamps[shard_id]; | |
542 | spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim), | |
543 | false); | |
544 | shard_id++; | |
545 | return true; | |
546 | } | |
547 | ||
548 | class MetaPeerTrimCR : public RGWCoroutine { | |
549 | PeerTrimEnv& env; | |
550 | rgw_mdlog_info mdlog_info; //< master's mdlog info | |
551 | ||
552 | public: | |
553 | explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {} | |
554 | ||
555 | int operate() override; | |
556 | }; | |
557 | ||
558 | int MetaPeerTrimCR::operate() | |
559 | { | |
560 | reenter(this) { | |
561 | ldout(cct, 10) << "fetching master mdlog info" << dendl; | |
562 | yield { | |
563 | // query mdlog_info from master for oldest_log_period | |
564 | rgw_http_param_pair params[] = { | |
565 | { "type", "metadata" }, | |
566 | { nullptr, nullptr } | |
567 | }; | |
568 | ||
569 | using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>; | |
570 | call(new LogInfoCR(cct, env.store->svc()->zone->get_master_conn(), env.http, | |
571 | "/admin/log/", params, &mdlog_info)); | |
572 | } | |
573 | if (retcode < 0) { | |
574 | ldout(cct, 4) << "failed to read mdlog info from master" << dendl; | |
575 | return set_cr_error(retcode); | |
576 | } | |
577 | // use master's shard count instead | |
578 | env.set_num_shards(mdlog_info.num_shards); | |
579 | ||
580 | if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) { | |
581 | // delete any prior mdlog periods | |
582 | yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch, | |
583 | &env.last_trim_epoch)); | |
584 | } else { | |
585 | ldout(cct, 10) << "mdlogs already purged through realm_epoch " | |
586 | << env.last_trim_epoch << dendl; | |
587 | } | |
588 | ||
589 | // if realm_epoch == current, trim mdlog based on master's markers | |
590 | if (mdlog_info.realm_epoch == env.current.get_epoch()) { | |
591 | yield { | |
592 | auto mdlog = env.store->svc()->mdlog->get_log(env.current.get_period().get_id()); | |
593 | call(new MetaPeerTrimShardCollectCR(env, mdlog)); | |
594 | // ignore any errors during purge/trim because we want to hold the lock open | |
595 | } | |
596 | } | |
597 | return set_cr_done(); | |
598 | } | |
599 | return 0; | |
600 | } | |
601 | ||
602 | class MetaTrimPollCR : public RGWCoroutine { | |
603 | rgw::sal::RGWRadosStore *const store; | |
604 | const utime_t interval; //< polling interval | |
605 | const rgw_raw_obj obj; | |
606 | const std::string name{"meta_trim"}; //< lock name | |
607 | const std::string cookie; | |
608 | ||
609 | protected: | |
610 | /// allocate the coroutine to run within the lease | |
611 | virtual RGWCoroutine* alloc_cr() = 0; | |
612 | ||
613 | public: | |
614 | MetaTrimPollCR(rgw::sal::RGWRadosStore *store, utime_t interval) | |
615 | : RGWCoroutine(store->ctx()), store(store), interval(interval), | |
616 | obj(store->svc()->zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid), | |
617 | cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) | |
618 | {} | |
619 | ||
620 | int operate() override; | |
621 | }; | |
622 | ||
623 | int MetaTrimPollCR::operate() | |
624 | { | |
625 | reenter(this) { | |
626 | for (;;) { | |
627 | set_status("sleeping"); | |
628 | wait(interval); | |
629 | ||
630 | // prevent others from trimming for our entire wait interval | |
631 | set_status("acquiring trim lock"); | |
632 | yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store, | |
633 | obj, name, cookie, interval.sec())); | |
634 | if (retcode < 0) { | |
635 | ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; | |
636 | continue; | |
637 | } | |
638 | ||
639 | set_status("trimming"); | |
640 | yield call(alloc_cr()); | |
641 | ||
642 | if (retcode < 0) { | |
643 | // on errors, unlock so other gateways can try | |
644 | set_status("unlocking"); | |
645 | yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store, | |
646 | obj, name, cookie)); | |
647 | } | |
648 | } | |
649 | } | |
650 | return 0; | |
651 | } | |
652 | ||
653 | class MetaMasterTrimPollCR : public MetaTrimPollCR { | |
654 | MasterTrimEnv env; //< trim state to share between calls | |
655 | RGWCoroutine* alloc_cr() override { | |
656 | return new MetaMasterTrimCR(env); | |
657 | } | |
658 | public: | |
659 | MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, | |
660 | int num_shards, utime_t interval) | |
661 | : MetaTrimPollCR(store, interval), | |
662 | env(dpp, store, http, num_shards) | |
663 | {} | |
664 | }; | |
665 | ||
666 | class MetaPeerTrimPollCR : public MetaTrimPollCR { | |
667 | PeerTrimEnv env; //< trim state to share between calls | |
668 | RGWCoroutine* alloc_cr() override { | |
669 | return new MetaPeerTrimCR(env); | |
670 | } | |
671 | public: | |
672 | MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, | |
673 | int num_shards, utime_t interval) | |
674 | : MetaTrimPollCR(store, interval), | |
675 | env(dpp, store, http, num_shards) | |
676 | {} | |
677 | }; | |
678 | ||
679 | RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, | |
680 | int num_shards, utime_t interval) | |
681 | { | |
682 | if (store->svc()->zone->is_meta_master()) { | |
683 | return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval); | |
684 | } | |
685 | return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval); | |
686 | } | |
687 | ||
688 | ||
689 | struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR { | |
690 | MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards) | |
691 | : MasterTrimEnv(dpp, store, http, num_shards), | |
692 | MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this)) | |
693 | {} | |
694 | }; | |
695 | ||
696 | struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR { | |
697 | MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards) | |
698 | : PeerTrimEnv(dpp, store, http, num_shards), | |
699 | MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this)) | |
700 | {} | |
701 | }; | |
702 | ||
703 | RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, | |
704 | RGWHTTPManager *http, | |
705 | int num_shards) | |
706 | { | |
707 | if (store->svc()->zone->is_meta_master()) { | |
708 | return new MetaMasterAdminTrimCR(dpp, store, http, num_shards); | |
709 | } | |
710 | return new MetaPeerAdminTrimCR(dpp, store, http, num_shards); | |
711 | } |