]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_trim_mdlog.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_trim_mdlog.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "common/errno.h"
5
6#include "rgw_trim_mdlog.h"
7#include "rgw_sync.h"
8#include "rgw_cr_rados.h"
9#include "rgw_cr_rest.h"
9f95a23c
TL
10#include "rgw_zone.h"
11#include "services/svc_zone.h"
12#include "services/svc_meta.h"
13#include "services/svc_mdlog.h"
14#include "services/svc_cls.h"
15
16#include <boost/asio/yield.hpp>
17
18#define dout_subsys ceph_subsys_rgw
19
20#undef dout_prefix
21#define dout_prefix (*_dout << "meta trim: ")
22
23/// purge all log shards for the given mdlog
24class PurgeLogShardsCR : public RGWShardCollectCR {
25 rgw::sal::RGWRadosStore *const store;
26 const RGWMetadataLog* mdlog;
27 const int num_shards;
28 rgw_raw_obj obj;
29 int i{0};
30
31 static constexpr int max_concurrent = 16;
32
33 public:
34 PurgeLogShardsCR(rgw::sal::RGWRadosStore *store, const RGWMetadataLog* mdlog,
35 const rgw_pool& pool, int num_shards)
36 : RGWShardCollectCR(store->ctx(), max_concurrent),
37 store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
38 {}
39
40 bool spawn_next() override {
41 if (i == num_shards) {
42 return false;
43 }
44 mdlog->get_shard_oid(i++, obj.oid);
45 spawn(new RGWRadosRemoveCR(store, obj), false);
46 return true;
47 }
48};
49
50using Cursor = RGWPeriodHistory::Cursor;
51
52/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
53class PurgePeriodLogsCR : public RGWCoroutine {
54 struct Svc {
55 RGWSI_Zone *zone;
56 RGWSI_MDLog *mdlog;
57 } svc;
58 rgw::sal::RGWRadosStore *const store;
59 RGWMetadataManager *const metadata;
60 RGWObjVersionTracker objv;
61 Cursor cursor;
62 epoch_t realm_epoch;
63 epoch_t *last_trim_epoch; //< update last trim on success
64
65 public:
66 PurgePeriodLogsCR(rgw::sal::RGWRadosStore *store, epoch_t realm_epoch, epoch_t *last_trim)
67 : RGWCoroutine(store->ctx()), store(store), metadata(store->ctl()->meta.mgr),
68 realm_epoch(realm_epoch), last_trim_epoch(last_trim) {
69 svc.zone = store->svc()->zone;
70 svc.mdlog = store->svc()->mdlog;
71 }
72
73 int operate() override;
74};
75
76int PurgePeriodLogsCR::operate()
77{
78 reenter(this) {
79 // read our current oldest log period
80 yield call(svc.mdlog->read_oldest_log_period_cr(&cursor, &objv));
81 if (retcode < 0) {
82 return set_cr_error(retcode);
83 }
84 ceph_assert(cursor);
85 ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
86 << " period=" << cursor.get_period().get_id() << dendl;
87
88 // trim -up to- the given realm_epoch
89 while (cursor.get_epoch() < realm_epoch) {
90 ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
91 << " period=" << cursor.get_period().get_id() << dendl;
92 yield {
93 const auto mdlog = svc.mdlog->get_log(cursor.get_period().get_id());
94 const auto& pool = svc.zone->get_zone_params().log_pool;
95 auto num_shards = cct->_conf->rgw_md_log_max_shards;
96 call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
97 }
98 if (retcode < 0) {
99 ldout(cct, 1) << "failed to remove log shards: "
100 << cpp_strerror(retcode) << dendl;
101 return set_cr_error(retcode);
102 }
103 ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
104 << " period=" << cursor.get_period().get_id() << dendl;
105
106 // update our mdlog history
107 yield call(svc.mdlog->trim_log_period_cr(cursor, &objv));
108 if (retcode == -ENOENT) {
109 // must have raced to update mdlog history. return success and allow the
110 // winner to continue purging
111 ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
112 << " period=" << cursor.get_period().get_id() << dendl;
113 return set_cr_done();
114 } else if (retcode < 0) {
115 ldout(cct, 1) << "failed to remove log shards for realm_epoch="
116 << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
117 << " with: " << cpp_strerror(retcode) << dendl;
118 return set_cr_error(retcode);
119 }
120
121 if (*last_trim_epoch < cursor.get_epoch()) {
122 *last_trim_epoch = cursor.get_epoch();
123 }
124
125 ceph_assert(cursor.has_next()); // get_current() should always come after
126 cursor.next();
127 }
128 return set_cr_done();
129 }
130 return 0;
131}
132
133namespace {
134
135using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
136
137/// construct a RGWRESTConn for each zone in the realm
138template <typename Zonegroups>
139connection_map make_peer_connections(rgw::sal::RGWRadosStore *store,
140 const Zonegroups& zonegroups)
141{
142 connection_map connections;
143 for (auto& g : zonegroups) {
144 for (auto& z : g.second.zones) {
145 std::unique_ptr<RGWRESTConn> conn{
146 new RGWRESTConn(store->ctx(), store->svc()->zone, z.first.id, z.second.endpoints)};
147 connections.emplace(z.first.id, std::move(conn));
148 }
149 }
150 return connections;
151}
152
153/// return the marker that it's safe to trim up to
154const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
155{
156 return m.state == m.FullSync ? m.next_step_marker : m.marker;
157}
158
159/// comparison operator for take_min_status()
160bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
161{
162 // sort by stable marker
163 return get_stable_marker(lhs) < get_stable_marker(rhs);
164}
165
166/// populate the status with the minimum stable marker of each shard for any
167/// peer whose realm_epoch matches the minimum realm_epoch in the input
168template <typename Iter>
169int take_min_status(CephContext *cct, Iter first, Iter last,
170 rgw_meta_sync_status *status)
171{
172 if (first == last) {
173 return -EINVAL;
174 }
175 const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
176
177 status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
178 for (auto p = first; p != last; ++p) {
179 // validate peer's shard count
180 if (p->sync_markers.size() != num_shards) {
181 ldout(cct, 1) << "take_min_status got peer status with "
182 << p->sync_markers.size() << " shards, expected "
183 << num_shards << dendl;
184 return -EINVAL;
185 }
186 if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
187 // earlier epoch, take its entire status
188 *status = std::move(*p);
189 } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
190 // same epoch, take any earlier markers
191 auto m = status->sync_markers.begin();
192 for (auto& shard : p->sync_markers) {
193 if (shard.second < m->second) {
194 m->second = std::move(shard.second);
195 }
196 ++m;
197 }
198 }
199 }
200 return 0;
201}
202
203struct TrimEnv {
204 const DoutPrefixProvider *dpp;
205 rgw::sal::RGWRadosStore *const store;
206 RGWHTTPManager *const http;
207 int num_shards;
208 const rgw_zone_id& zone;
209 Cursor current; //< cursor to current period
210 epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
211
212 TrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
213 : dpp(dpp), store(store), http(http), num_shards(num_shards),
214 zone(store->svc()->zone->zone_id()),
215 current(store->svc()->mdlog->get_period_history()->get_current())
216 {}
217};
218
219struct MasterTrimEnv : public TrimEnv {
220 connection_map connections; //< peer connections
221 std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
222 /// last trim marker for each shard, only applies to current period's mdlog
223 std::vector<std::string> last_trim_markers;
224
225 MasterTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
226 : TrimEnv(dpp, store, http, num_shards),
227 last_trim_markers(num_shards)
228 {
229 auto& period = current.get_period();
230 connections = make_peer_connections(store, period.get_map().zonegroups);
231 connections.erase(zone.id);
232 peer_status.resize(connections.size());
233 }
234};
235
236struct PeerTrimEnv : public TrimEnv {
237 /// last trim timestamp for each shard, only applies to current period's mdlog
238 std::vector<ceph::real_time> last_trim_timestamps;
239
240 PeerTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
241 : TrimEnv(dpp, store, http, num_shards),
242 last_trim_timestamps(num_shards)
243 {}
244
245 void set_num_shards(int num_shards) {
246 this->num_shards = num_shards;
247 last_trim_timestamps.resize(num_shards);
248 }
249};
250
251} // anonymous namespace
252
253
254/// spawn a trim cr for each shard that needs it, while limiting the number
255/// of concurrent shards
256class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
257 private:
258 static constexpr int MAX_CONCURRENT_SHARDS = 16;
259
260 MasterTrimEnv& env;
261 RGWMetadataLog *mdlog;
262 int shard_id{0};
263 std::string oid;
264 const rgw_meta_sync_status& sync_status;
265
266 public:
267 MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
268 const rgw_meta_sync_status& sync_status)
269 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
270 env(env), mdlog(mdlog), sync_status(sync_status)
271 {}
272
273 bool spawn_next() override;
274};
275
276bool MetaMasterTrimShardCollectCR::spawn_next()
277{
278 while (shard_id < env.num_shards) {
279 auto m = sync_status.sync_markers.find(shard_id);
280 if (m == sync_status.sync_markers.end()) {
281 shard_id++;
282 continue;
283 }
284 auto& stable = get_stable_marker(m->second);
285 auto& last_trim = env.last_trim_markers[shard_id];
286
287 if (stable <= last_trim) {
288 // already trimmed
289 ldout(cct, 20) << "skipping log shard " << shard_id
290 << " at marker=" << stable
291 << " last_trim=" << last_trim
292 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
293 shard_id++;
294 continue;
295 }
296
297 mdlog->get_shard_oid(shard_id, oid);
298
299 ldout(cct, 10) << "trimming log shard " << shard_id
300 << " at marker=" << stable
301 << " last_trim=" << last_trim
302 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
303 spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
304 shard_id++;
305 return true;
306 }
307 return false;
308}
309
310/// spawn rest requests to read each peer's sync status
311class MetaMasterStatusCollectCR : public RGWShardCollectCR {
312 static constexpr int MAX_CONCURRENT_SHARDS = 16;
313
314 MasterTrimEnv& env;
315 connection_map::iterator c;
316 std::vector<rgw_meta_sync_status>::iterator s;
317 public:
318 explicit MetaMasterStatusCollectCR(MasterTrimEnv& env)
319 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
320 env(env), c(env.connections.begin()), s(env.peer_status.begin())
321 {}
322
323 bool spawn_next() override {
324 if (c == env.connections.end()) {
325 return false;
326 }
327 static rgw_http_param_pair params[] = {
328 { "type", "metadata" },
329 { "status", nullptr },
330 { nullptr, nullptr }
331 };
332
333 ldout(cct, 20) << "query sync status from " << c->first << dendl;
334 auto conn = c->second.get();
335 using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
336 spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
337 false);
338 ++c;
339 ++s;
340 return true;
341 }
342};
343
344class MetaMasterTrimCR : public RGWCoroutine {
345 MasterTrimEnv& env;
346 rgw_meta_sync_status min_status; //< minimum sync status of all peers
347 int ret{0};
348
349 public:
350 explicit MetaMasterTrimCR(MasterTrimEnv& env)
351 : RGWCoroutine(env.store->ctx()), env(env)
352 {}
353
354 int operate() override;
355};
356
357int MetaMasterTrimCR::operate()
358{
359 reenter(this) {
360 // TODO: detect this and fail before we spawn the trim thread?
361 if (env.connections.empty()) {
362 ldout(cct, 4) << "no peers, exiting" << dendl;
363 return set_cr_done();
364 }
365
366 ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
367 // query mdlog sync status from peers
368 yield call(new MetaMasterStatusCollectCR(env));
369
370 // must get a successful reply from all peers to consider trimming
371 if (ret < 0) {
372 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
373 return set_cr_error(ret);
374 }
375
376 // determine the minimum epoch and markers
377 ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
378 env.peer_status.end(), &min_status);
379 if (ret < 0) {
380 ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
381 return set_cr_error(ret);
382 }
383 yield {
384 auto store = env.store;
385 auto epoch = min_status.sync_info.realm_epoch;
386 ldout(cct, 4) << "realm epoch min=" << epoch
387 << " current=" << env.current.get_epoch()<< dendl;
388 if (epoch > env.last_trim_epoch + 1) {
389 // delete any prior mdlog periods
390 spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
391 } else {
392 ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
393 << env.last_trim_epoch << dendl;
394 }
395
396 // if realm_epoch == current, trim mdlog based on markers
397 if (epoch == env.current.get_epoch()) {
398 auto mdlog = store->svc()->mdlog->get_log(env.current.get_period().get_id());
399 spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
400 }
401 }
402 // ignore any errors during purge/trim because we want to hold the lock open
403 return set_cr_done();
404 }
405 return 0;
406}
407
408
409/// read the first entry of the master's mdlog shard and trim to that position
410class MetaPeerTrimShardCR : public RGWCoroutine {
411 RGWMetaSyncEnv& env;
412 RGWMetadataLog *mdlog;
413 const std::string& period_id;
414 const int shard_id;
415 RGWMetadataLogInfo info;
416 ceph::real_time stable; //< safe timestamp to trim, according to master
417 ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
418 rgw_mdlog_shard_data result; //< result from master's mdlog listing
419
420 public:
421 MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
422 const std::string& period_id, int shard_id,
423 ceph::real_time *last_trim)
424 : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
425 period_id(period_id), shard_id(shard_id), last_trim(last_trim)
426 {}
427
428 int operate() override;
429};
430
431int MetaPeerTrimShardCR::operate()
432{
433 reenter(this) {
434 // query master's first mdlog entry for this shard
435 yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
436 "", 1, &result));
437 if (retcode < 0) {
438 ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
439 << shard_id << " for period " << period_id
440 << ": " << cpp_strerror(retcode) << dendl;
441 return set_cr_error(retcode);
442 }
443 if (result.entries.empty()) {
444 // if there are no mdlog entries, we don't have a timestamp to compare. we
445 // can't just trim everything, because there could be racing updates since
446 // this empty reply. query the mdlog shard info to read its max timestamp,
447 // then retry the listing to make sure it's still empty before trimming to
448 // that
449 ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id
450 << ", reading last timestamp from shard info" << dendl;
451 // read the mdlog shard info for the last timestamp
452 yield call(create_read_remote_mdlog_shard_info_cr(&env, period_id, shard_id, &info));
453 if (retcode < 0) {
454 ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard "
455 << shard_id << " for period " << period_id
456 << ": " << cpp_strerror(retcode) << dendl;
457 return set_cr_error(retcode);
458 }
459 if (ceph::real_clock::is_zero(info.last_update)) {
460 return set_cr_done(); // nothing to trim
461 }
462 ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update="
463 << info.last_update << dendl;
464 // re-read the master's first mdlog entry to make sure it hasn't changed
465 yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
466 "", 1, &result));
467 if (retcode < 0) {
468 ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
469 << shard_id << " for period " << period_id
470 << ": " << cpp_strerror(retcode) << dendl;
471 return set_cr_error(retcode);
472 }
473 // if the mdlog is still empty, trim to max marker
474 if (result.entries.empty()) {
475 stable = info.last_update;
476 } else {
477 stable = result.entries.front().timestamp;
478
479 // can only trim -up to- master's first timestamp, so subtract a second.
480 // (this is why we use timestamps instead of markers for the peers)
481 stable -= std::chrono::seconds(1);
482 }
483 } else {
484 stable = result.entries.front().timestamp;
485 stable -= std::chrono::seconds(1);
486 }
487
488 if (stable <= *last_trim) {
489 ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id
490 << " at timestamp=" << stable
491 << " last_trim=" << *last_trim << dendl;
492 return set_cr_done();
493 }
494
495 ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id
496 << " at timestamp=" << stable
497 << " last_trim=" << *last_trim << dendl;
498 yield {
499 std::string oid;
500 mdlog->get_shard_oid(shard_id, oid);
501 call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
502 }
503 if (retcode < 0 && retcode != -ENODATA) {
504 ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id
505 << ": " << cpp_strerror(retcode) << dendl;
506 return set_cr_error(retcode);
507 }
508 *last_trim = stable;
509 return set_cr_done();
510 }
511 return 0;
512}
513
514class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
515 static constexpr int MAX_CONCURRENT_SHARDS = 16;
516
517 PeerTrimEnv& env;
518 RGWMetadataLog *mdlog;
519 const std::string& period_id;
520 RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
521 int shard_id{0};
522
523 public:
524 MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
525 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
526 env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
527 {
528 meta_env.init(env.dpp, cct, env.store, env.store->svc()->zone->get_master_conn(),
529 env.store->svc()->rados->get_async_processor(), env.http, nullptr,
530 env.store->getRados()->get_sync_tracer());
531 }
532
533 bool spawn_next() override;
534};
535
536bool MetaPeerTrimShardCollectCR::spawn_next()
537{
538 if (shard_id >= env.num_shards) {
539 return false;
540 }
541 auto& last_trim = env.last_trim_timestamps[shard_id];
542 spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
543 false);
544 shard_id++;
545 return true;
546}
547
548class MetaPeerTrimCR : public RGWCoroutine {
549 PeerTrimEnv& env;
550 rgw_mdlog_info mdlog_info; //< master's mdlog info
551
552 public:
553 explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
554
555 int operate() override;
556};
557
558int MetaPeerTrimCR::operate()
559{
560 reenter(this) {
561 ldout(cct, 10) << "fetching master mdlog info" << dendl;
562 yield {
563 // query mdlog_info from master for oldest_log_period
564 rgw_http_param_pair params[] = {
565 { "type", "metadata" },
566 { nullptr, nullptr }
567 };
568
569 using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
570 call(new LogInfoCR(cct, env.store->svc()->zone->get_master_conn(), env.http,
571 "/admin/log/", params, &mdlog_info));
572 }
573 if (retcode < 0) {
574 ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
575 return set_cr_error(retcode);
576 }
577 // use master's shard count instead
578 env.set_num_shards(mdlog_info.num_shards);
579
580 if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
581 // delete any prior mdlog periods
582 yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
583 &env.last_trim_epoch));
584 } else {
585 ldout(cct, 10) << "mdlogs already purged through realm_epoch "
586 << env.last_trim_epoch << dendl;
587 }
588
589 // if realm_epoch == current, trim mdlog based on master's markers
590 if (mdlog_info.realm_epoch == env.current.get_epoch()) {
591 yield {
592 auto mdlog = env.store->svc()->mdlog->get_log(env.current.get_period().get_id());
593 call(new MetaPeerTrimShardCollectCR(env, mdlog));
594 // ignore any errors during purge/trim because we want to hold the lock open
595 }
596 }
597 return set_cr_done();
598 }
599 return 0;
600}
601
602class MetaTrimPollCR : public RGWCoroutine {
603 rgw::sal::RGWRadosStore *const store;
604 const utime_t interval; //< polling interval
605 const rgw_raw_obj obj;
606 const std::string name{"meta_trim"}; //< lock name
607 const std::string cookie;
608
609 protected:
610 /// allocate the coroutine to run within the lease
611 virtual RGWCoroutine* alloc_cr() = 0;
612
613 public:
614 MetaTrimPollCR(rgw::sal::RGWRadosStore *store, utime_t interval)
615 : RGWCoroutine(store->ctx()), store(store), interval(interval),
616 obj(store->svc()->zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
617 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
618 {}
619
620 int operate() override;
621};
622
623int MetaTrimPollCR::operate()
624{
625 reenter(this) {
626 for (;;) {
627 set_status("sleeping");
628 wait(interval);
629
630 // prevent others from trimming for our entire wait interval
631 set_status("acquiring trim lock");
632 yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
633 obj, name, cookie, interval.sec()));
634 if (retcode < 0) {
635 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
636 continue;
637 }
638
639 set_status("trimming");
640 yield call(alloc_cr());
641
642 if (retcode < 0) {
643 // on errors, unlock so other gateways can try
644 set_status("unlocking");
645 yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
646 obj, name, cookie));
647 }
648 }
649 }
650 return 0;
651}
652
653class MetaMasterTrimPollCR : public MetaTrimPollCR {
654 MasterTrimEnv env; //< trim state to share between calls
655 RGWCoroutine* alloc_cr() override {
656 return new MetaMasterTrimCR(env);
657 }
658 public:
659 MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
660 int num_shards, utime_t interval)
661 : MetaTrimPollCR(store, interval),
662 env(dpp, store, http, num_shards)
663 {}
664};
665
666class MetaPeerTrimPollCR : public MetaTrimPollCR {
667 PeerTrimEnv env; //< trim state to share between calls
668 RGWCoroutine* alloc_cr() override {
669 return new MetaPeerTrimCR(env);
670 }
671 public:
672 MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
673 int num_shards, utime_t interval)
674 : MetaTrimPollCR(store, interval),
675 env(dpp, store, http, num_shards)
676 {}
677};
678
679RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
680 int num_shards, utime_t interval)
681{
682 if (store->svc()->zone->is_meta_master()) {
683 return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval);
684 }
685 return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval);
686}
687
688
689struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
690 MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
691 : MasterTrimEnv(dpp, store, http, num_shards),
692 MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
693 {}
694};
695
696struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
697 MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
698 : PeerTrimEnv(dpp, store, http, num_shards),
699 MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
700 {}
701};
702
703RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store,
704 RGWHTTPManager *http,
705 int num_shards)
706{
707 if (store->svc()->zone->is_meta_master()) {
708 return new MetaMasterAdminTrimCR(dpp, store, http, num_shards);
709 }
710 return new MetaPeerAdminTrimCR(dpp, store, http, num_shards);
711}