]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_trim_mdlog.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_trim_mdlog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/errno.h"
5
6 #include "rgw_trim_mdlog.h"
7 #include "rgw_sync.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_cr_rest.h"
10 #include "rgw_rados.h"
11 #include "rgw_zone.h"
12 #include "services/svc_zone.h"
13 #include "services/svc_meta.h"
14 #include "services/svc_mdlog.h"
15 #include "services/svc_cls.h"
16
17 #include <boost/asio/yield.hpp>
18
19 #define dout_subsys ceph_subsys_rgw
20
21 #undef dout_prefix
22 #define dout_prefix (*_dout << "meta trim: ")
23
24 /// purge all log shards for the given mdlog
25 class PurgeLogShardsCR : public RGWShardCollectCR {
26 rgw::sal::RGWRadosStore *const store;
27 const RGWMetadataLog* mdlog;
28 const int num_shards;
29 rgw_raw_obj obj;
30 int i{0};
31
32 static constexpr int max_concurrent = 16;
33
34 public:
35 PurgeLogShardsCR(rgw::sal::RGWRadosStore *store, const RGWMetadataLog* mdlog,
36 const rgw_pool& pool, int num_shards)
37 : RGWShardCollectCR(store->ctx(), max_concurrent),
38 store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
39 {}
40
41 bool spawn_next() override {
42 if (i == num_shards) {
43 return false;
44 }
45 mdlog->get_shard_oid(i++, obj.oid);
46 spawn(new RGWRadosRemoveCR(store, obj), false);
47 return true;
48 }
49 };
50
51 using Cursor = RGWPeriodHistory::Cursor;
52
53 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
54 class PurgePeriodLogsCR : public RGWCoroutine {
55 struct Svc {
56 RGWSI_Zone *zone;
57 RGWSI_MDLog *mdlog;
58 } svc;
59 rgw::sal::RGWRadosStore *const store;
60 RGWMetadataManager *const metadata;
61 RGWObjVersionTracker objv;
62 Cursor cursor;
63 epoch_t realm_epoch;
64 epoch_t *last_trim_epoch; //< update last trim on success
65
66 public:
67 PurgePeriodLogsCR(rgw::sal::RGWRadosStore *store, epoch_t realm_epoch, epoch_t *last_trim)
68 : RGWCoroutine(store->ctx()), store(store), metadata(store->ctl()->meta.mgr),
69 realm_epoch(realm_epoch), last_trim_epoch(last_trim) {
70 svc.zone = store->svc()->zone;
71 svc.mdlog = store->svc()->mdlog;
72 }
73
74 int operate() override;
75 };
76
77 int PurgePeriodLogsCR::operate()
78 {
79 reenter(this) {
80 // read our current oldest log period
81 yield call(svc.mdlog->read_oldest_log_period_cr(&cursor, &objv));
82 if (retcode < 0) {
83 return set_cr_error(retcode);
84 }
85 ceph_assert(cursor);
86 ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
87 << " period=" << cursor.get_period().get_id() << dendl;
88
89 // trim -up to- the given realm_epoch
90 while (cursor.get_epoch() < realm_epoch) {
91 ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
92 << " period=" << cursor.get_period().get_id() << dendl;
93 yield {
94 const auto mdlog = svc.mdlog->get_log(cursor.get_period().get_id());
95 const auto& pool = svc.zone->get_zone_params().log_pool;
96 auto num_shards = cct->_conf->rgw_md_log_max_shards;
97 call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
98 }
99 if (retcode < 0) {
100 ldout(cct, 1) << "failed to remove log shards: "
101 << cpp_strerror(retcode) << dendl;
102 return set_cr_error(retcode);
103 }
104 ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
105 << " period=" << cursor.get_period().get_id() << dendl;
106
107 // update our mdlog history
108 yield call(svc.mdlog->trim_log_period_cr(cursor, &objv));
109 if (retcode == -ENOENT) {
110 // must have raced to update mdlog history. return success and allow the
111 // winner to continue purging
112 ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
113 << " period=" << cursor.get_period().get_id() << dendl;
114 return set_cr_done();
115 } else if (retcode < 0) {
116 ldout(cct, 1) << "failed to remove log shards for realm_epoch="
117 << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
118 << " with: " << cpp_strerror(retcode) << dendl;
119 return set_cr_error(retcode);
120 }
121
122 if (*last_trim_epoch < cursor.get_epoch()) {
123 *last_trim_epoch = cursor.get_epoch();
124 }
125
126 ceph_assert(cursor.has_next()); // get_current() should always come after
127 cursor.next();
128 }
129 return set_cr_done();
130 }
131 return 0;
132 }
133
134 namespace {
135
136 using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
137
138 /// construct a RGWRESTConn for each zone in the realm
139 template <typename Zonegroups>
140 connection_map make_peer_connections(rgw::sal::RGWRadosStore *store,
141 const Zonegroups& zonegroups)
142 {
143 connection_map connections;
144 for (auto& g : zonegroups) {
145 for (auto& z : g.second.zones) {
146 std::unique_ptr<RGWRESTConn> conn{
147 new RGWRESTConn(store->ctx(), store->svc()->zone, z.first.id, z.second.endpoints)};
148 connections.emplace(z.first.id, std::move(conn));
149 }
150 }
151 return connections;
152 }
153
154 /// return the marker that it's safe to trim up to
155 const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
156 {
157 return m.state == m.FullSync ? m.next_step_marker : m.marker;
158 }
159
160 /// comparison operator for take_min_status()
161 bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
162 {
163 // sort by stable marker
164 return get_stable_marker(lhs) < get_stable_marker(rhs);
165 }
166
167 /// populate the status with the minimum stable marker of each shard for any
168 /// peer whose realm_epoch matches the minimum realm_epoch in the input
169 template <typename Iter>
170 int take_min_status(CephContext *cct, Iter first, Iter last,
171 rgw_meta_sync_status *status)
172 {
173 if (first == last) {
174 return -EINVAL;
175 }
176 const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
177
178 status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
179 for (auto p = first; p != last; ++p) {
180 // validate peer's shard count
181 if (p->sync_markers.size() != num_shards) {
182 ldout(cct, 1) << "take_min_status got peer status with "
183 << p->sync_markers.size() << " shards, expected "
184 << num_shards << dendl;
185 return -EINVAL;
186 }
187 if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
188 // earlier epoch, take its entire status
189 *status = std::move(*p);
190 } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
191 // same epoch, take any earlier markers
192 auto m = status->sync_markers.begin();
193 for (auto& shard : p->sync_markers) {
194 if (shard.second < m->second) {
195 m->second = std::move(shard.second);
196 }
197 ++m;
198 }
199 }
200 }
201 return 0;
202 }
203
204 struct TrimEnv {
205 const DoutPrefixProvider *dpp;
206 rgw::sal::RGWRadosStore *const store;
207 RGWHTTPManager *const http;
208 int num_shards;
209 const rgw_zone_id& zone;
210 Cursor current; //< cursor to current period
211 epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
212
213 TrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
214 : dpp(dpp), store(store), http(http), num_shards(num_shards),
215 zone(store->svc()->zone->zone_id()),
216 current(store->svc()->mdlog->get_period_history()->get_current())
217 {}
218 };
219
220 struct MasterTrimEnv : public TrimEnv {
221 connection_map connections; //< peer connections
222 std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
223 /// last trim marker for each shard, only applies to current period's mdlog
224 std::vector<std::string> last_trim_markers;
225
226 MasterTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
227 : TrimEnv(dpp, store, http, num_shards),
228 last_trim_markers(num_shards)
229 {
230 auto& period = current.get_period();
231 connections = make_peer_connections(store, period.get_map().zonegroups);
232 connections.erase(zone.id);
233 peer_status.resize(connections.size());
234 }
235 };
236
237 struct PeerTrimEnv : public TrimEnv {
238 /// last trim timestamp for each shard, only applies to current period's mdlog
239 std::vector<ceph::real_time> last_trim_timestamps;
240
241 PeerTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
242 : TrimEnv(dpp, store, http, num_shards),
243 last_trim_timestamps(num_shards)
244 {}
245
246 void set_num_shards(int num_shards) {
247 this->num_shards = num_shards;
248 last_trim_timestamps.resize(num_shards);
249 }
250 };
251
252 } // anonymous namespace
253
254
255 /// spawn a trim cr for each shard that needs it, while limiting the number
256 /// of concurrent shards
257 class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
258 private:
259 static constexpr int MAX_CONCURRENT_SHARDS = 16;
260
261 MasterTrimEnv& env;
262 RGWMetadataLog *mdlog;
263 int shard_id{0};
264 std::string oid;
265 const rgw_meta_sync_status& sync_status;
266
267 public:
268 MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
269 const rgw_meta_sync_status& sync_status)
270 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
271 env(env), mdlog(mdlog), sync_status(sync_status)
272 {}
273
274 bool spawn_next() override;
275 };
276
277 bool MetaMasterTrimShardCollectCR::spawn_next()
278 {
279 while (shard_id < env.num_shards) {
280 auto m = sync_status.sync_markers.find(shard_id);
281 if (m == sync_status.sync_markers.end()) {
282 shard_id++;
283 continue;
284 }
285 auto& stable = get_stable_marker(m->second);
286 auto& last_trim = env.last_trim_markers[shard_id];
287
288 if (stable <= last_trim) {
289 // already trimmed
290 ldout(cct, 20) << "skipping log shard " << shard_id
291 << " at marker=" << stable
292 << " last_trim=" << last_trim
293 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
294 shard_id++;
295 continue;
296 }
297
298 mdlog->get_shard_oid(shard_id, oid);
299
300 ldout(cct, 10) << "trimming log shard " << shard_id
301 << " at marker=" << stable
302 << " last_trim=" << last_trim
303 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
304 spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
305 shard_id++;
306 return true;
307 }
308 return false;
309 }
310
311 /// spawn rest requests to read each peer's sync status
312 class MetaMasterStatusCollectCR : public RGWShardCollectCR {
313 static constexpr int MAX_CONCURRENT_SHARDS = 16;
314
315 MasterTrimEnv& env;
316 connection_map::iterator c;
317 std::vector<rgw_meta_sync_status>::iterator s;
318 public:
319 explicit MetaMasterStatusCollectCR(MasterTrimEnv& env)
320 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
321 env(env), c(env.connections.begin()), s(env.peer_status.begin())
322 {}
323
324 bool spawn_next() override {
325 if (c == env.connections.end()) {
326 return false;
327 }
328 static rgw_http_param_pair params[] = {
329 { "type", "metadata" },
330 { "status", nullptr },
331 { nullptr, nullptr }
332 };
333
334 ldout(cct, 20) << "query sync status from " << c->first << dendl;
335 auto conn = c->second.get();
336 using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
337 spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
338 false);
339 ++c;
340 ++s;
341 return true;
342 }
343 };
344
345 class MetaMasterTrimCR : public RGWCoroutine {
346 MasterTrimEnv& env;
347 rgw_meta_sync_status min_status; //< minimum sync status of all peers
348 int ret{0};
349
350 public:
351 explicit MetaMasterTrimCR(MasterTrimEnv& env)
352 : RGWCoroutine(env.store->ctx()), env(env)
353 {}
354
355 int operate() override;
356 };
357
358 int MetaMasterTrimCR::operate()
359 {
360 reenter(this) {
361 // TODO: detect this and fail before we spawn the trim thread?
362 if (env.connections.empty()) {
363 ldout(cct, 4) << "no peers, exiting" << dendl;
364 return set_cr_done();
365 }
366
367 ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
368 // query mdlog sync status from peers
369 yield call(new MetaMasterStatusCollectCR(env));
370
371 // must get a successful reply from all peers to consider trimming
372 if (ret < 0) {
373 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
374 return set_cr_error(ret);
375 }
376
377 // determine the minimum epoch and markers
378 ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
379 env.peer_status.end(), &min_status);
380 if (ret < 0) {
381 ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
382 return set_cr_error(ret);
383 }
384 yield {
385 auto store = env.store;
386 auto epoch = min_status.sync_info.realm_epoch;
387 ldout(cct, 4) << "realm epoch min=" << epoch
388 << " current=" << env.current.get_epoch()<< dendl;
389 if (epoch > env.last_trim_epoch + 1) {
390 // delete any prior mdlog periods
391 spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
392 } else {
393 ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
394 << env.last_trim_epoch << dendl;
395 }
396
397 // if realm_epoch == current, trim mdlog based on markers
398 if (epoch == env.current.get_epoch()) {
399 auto mdlog = store->svc()->mdlog->get_log(env.current.get_period().get_id());
400 spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
401 }
402 }
403 // ignore any errors during purge/trim because we want to hold the lock open
404 return set_cr_done();
405 }
406 return 0;
407 }
408
409
410 /// read the first entry of the master's mdlog shard and trim to that position
411 class MetaPeerTrimShardCR : public RGWCoroutine {
412 RGWMetaSyncEnv& env;
413 RGWMetadataLog *mdlog;
414 const std::string& period_id;
415 const int shard_id;
416 RGWMetadataLogInfo info;
417 ceph::real_time stable; //< safe timestamp to trim, according to master
418 ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
419 rgw_mdlog_shard_data result; //< result from master's mdlog listing
420
421 public:
422 MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
423 const std::string& period_id, int shard_id,
424 ceph::real_time *last_trim)
425 : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
426 period_id(period_id), shard_id(shard_id), last_trim(last_trim)
427 {}
428
429 int operate() override;
430 };
431
432 int MetaPeerTrimShardCR::operate()
433 {
434 reenter(this) {
435 // query master's first mdlog entry for this shard
436 yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
437 "", 1, &result));
438 if (retcode < 0) {
439 ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
440 << shard_id << " for period " << period_id
441 << ": " << cpp_strerror(retcode) << dendl;
442 return set_cr_error(retcode);
443 }
444 if (result.entries.empty()) {
445 // if there are no mdlog entries, we don't have a timestamp to compare. we
446 // can't just trim everything, because there could be racing updates since
447 // this empty reply. query the mdlog shard info to read its max timestamp,
448 // then retry the listing to make sure it's still empty before trimming to
449 // that
450 ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id
451 << ", reading last timestamp from shard info" << dendl;
452 // read the mdlog shard info for the last timestamp
453 yield call(create_read_remote_mdlog_shard_info_cr(&env, period_id, shard_id, &info));
454 if (retcode < 0) {
455 ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard "
456 << shard_id << " for period " << period_id
457 << ": " << cpp_strerror(retcode) << dendl;
458 return set_cr_error(retcode);
459 }
460 if (ceph::real_clock::is_zero(info.last_update)) {
461 return set_cr_done(); // nothing to trim
462 }
463 ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update="
464 << info.last_update << dendl;
465 // re-read the master's first mdlog entry to make sure it hasn't changed
466 yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
467 "", 1, &result));
468 if (retcode < 0) {
469 ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
470 << shard_id << " for period " << period_id
471 << ": " << cpp_strerror(retcode) << dendl;
472 return set_cr_error(retcode);
473 }
474 // if the mdlog is still empty, trim to max marker
475 if (result.entries.empty()) {
476 stable = info.last_update;
477 } else {
478 stable = result.entries.front().timestamp;
479
480 // can only trim -up to- master's first timestamp, so subtract a second.
481 // (this is why we use timestamps instead of markers for the peers)
482 stable -= std::chrono::seconds(1);
483 }
484 } else {
485 stable = result.entries.front().timestamp;
486 stable -= std::chrono::seconds(1);
487 }
488
489 if (stable <= *last_trim) {
490 ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id
491 << " at timestamp=" << stable
492 << " last_trim=" << *last_trim << dendl;
493 return set_cr_done();
494 }
495
496 ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id
497 << " at timestamp=" << stable
498 << " last_trim=" << *last_trim << dendl;
499 yield {
500 std::string oid;
501 mdlog->get_shard_oid(shard_id, oid);
502 call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
503 }
504 if (retcode < 0 && retcode != -ENODATA) {
505 ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id
506 << ": " << cpp_strerror(retcode) << dendl;
507 return set_cr_error(retcode);
508 }
509 *last_trim = stable;
510 return set_cr_done();
511 }
512 return 0;
513 }
514
515 class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
516 static constexpr int MAX_CONCURRENT_SHARDS = 16;
517
518 PeerTrimEnv& env;
519 RGWMetadataLog *mdlog;
520 const std::string& period_id;
521 RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
522 int shard_id{0};
523
524 public:
525 MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
526 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
527 env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
528 {
529 meta_env.init(env.dpp, cct, env.store, env.store->svc()->zone->get_master_conn(),
530 env.store->svc()->rados->get_async_processor(), env.http, nullptr,
531 env.store->getRados()->get_sync_tracer());
532 }
533
534 bool spawn_next() override;
535 };
536
537 bool MetaPeerTrimShardCollectCR::spawn_next()
538 {
539 if (shard_id >= env.num_shards) {
540 return false;
541 }
542 auto& last_trim = env.last_trim_timestamps[shard_id];
543 spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
544 false);
545 shard_id++;
546 return true;
547 }
548
549 class MetaPeerTrimCR : public RGWCoroutine {
550 PeerTrimEnv& env;
551 rgw_mdlog_info mdlog_info; //< master's mdlog info
552
553 public:
554 explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
555
556 int operate() override;
557 };
558
559 int MetaPeerTrimCR::operate()
560 {
561 reenter(this) {
562 ldout(cct, 10) << "fetching master mdlog info" << dendl;
563 yield {
564 // query mdlog_info from master for oldest_log_period
565 rgw_http_param_pair params[] = {
566 { "type", "metadata" },
567 { nullptr, nullptr }
568 };
569
570 using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
571 call(new LogInfoCR(cct, env.store->svc()->zone->get_master_conn(), env.http,
572 "/admin/log/", params, &mdlog_info));
573 }
574 if (retcode < 0) {
575 ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
576 return set_cr_error(retcode);
577 }
578 // use master's shard count instead
579 env.set_num_shards(mdlog_info.num_shards);
580
581 if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
582 // delete any prior mdlog periods
583 yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
584 &env.last_trim_epoch));
585 } else {
586 ldout(cct, 10) << "mdlogs already purged through realm_epoch "
587 << env.last_trim_epoch << dendl;
588 }
589
590 // if realm_epoch == current, trim mdlog based on master's markers
591 if (mdlog_info.realm_epoch == env.current.get_epoch()) {
592 yield {
593 auto mdlog = env.store->svc()->mdlog->get_log(env.current.get_period().get_id());
594 call(new MetaPeerTrimShardCollectCR(env, mdlog));
595 // ignore any errors during purge/trim because we want to hold the lock open
596 }
597 }
598 return set_cr_done();
599 }
600 return 0;
601 }
602
603 class MetaTrimPollCR : public RGWCoroutine {
604 rgw::sal::RGWRadosStore *const store;
605 const utime_t interval; //< polling interval
606 const rgw_raw_obj obj;
607 const std::string name{"meta_trim"}; //< lock name
608 const std::string cookie;
609
610 protected:
611 /// allocate the coroutine to run within the lease
612 virtual RGWCoroutine* alloc_cr() = 0;
613
614 public:
615 MetaTrimPollCR(rgw::sal::RGWRadosStore *store, utime_t interval)
616 : RGWCoroutine(store->ctx()), store(store), interval(interval),
617 obj(store->svc()->zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
618 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
619 {}
620
621 int operate() override;
622 };
623
624 int MetaTrimPollCR::operate()
625 {
626 reenter(this) {
627 for (;;) {
628 set_status("sleeping");
629 wait(interval);
630
631 // prevent others from trimming for our entire wait interval
632 set_status("acquiring trim lock");
633 yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
634 obj, name, cookie, interval.sec()));
635 if (retcode < 0) {
636 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
637 continue;
638 }
639
640 set_status("trimming");
641 yield call(alloc_cr());
642
643 if (retcode < 0) {
644 // on errors, unlock so other gateways can try
645 set_status("unlocking");
646 yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
647 obj, name, cookie));
648 }
649 }
650 }
651 return 0;
652 }
653
654 class MetaMasterTrimPollCR : public MetaTrimPollCR {
655 MasterTrimEnv env; //< trim state to share between calls
656 RGWCoroutine* alloc_cr() override {
657 return new MetaMasterTrimCR(env);
658 }
659 public:
660 MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
661 int num_shards, utime_t interval)
662 : MetaTrimPollCR(store, interval),
663 env(dpp, store, http, num_shards)
664 {}
665 };
666
667 class MetaPeerTrimPollCR : public MetaTrimPollCR {
668 PeerTrimEnv env; //< trim state to share between calls
669 RGWCoroutine* alloc_cr() override {
670 return new MetaPeerTrimCR(env);
671 }
672 public:
673 MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
674 int num_shards, utime_t interval)
675 : MetaTrimPollCR(store, interval),
676 env(dpp, store, http, num_shards)
677 {}
678 };
679
680 RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
681 int num_shards, utime_t interval)
682 {
683 if (store->svc()->zone->is_meta_master()) {
684 return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval);
685 }
686 return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval);
687 }
688
689
690 struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
691 MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
692 : MasterTrimEnv(dpp, store, http, num_shards),
693 MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
694 {}
695 };
696
697 struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
698 MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
699 : PeerTrimEnv(dpp, store, http, num_shards),
700 MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
701 {}
702 };
703
704 RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store,
705 RGWHTTPManager *http,
706 int num_shards)
707 {
708 if (store->svc()->zone->is_meta_master()) {
709 return new MetaMasterAdminTrimCR(dpp, store, http, num_shards);
710 }
711 return new MetaPeerAdminTrimCR(dpp, store, http, num_shards);
712 }