1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "common/errno.h"
9 #include "rgw_trim_datalog.h"
10 #include "rgw_cr_rados.h"
11 #include "rgw_cr_rest.h"
12 #include "rgw_datalog.h"
13 #include "rgw_data_sync.h"
15 #include "rgw_bucket.h"
17 #include "services/svc_zone.h"
19 #include <boost/asio/yield.hpp>
21 #define dout_subsys ceph_subsys_rgw
24 #define dout_prefix (*_dout << "data trim: ")
28 class DatalogTrimImplCR
: public RGWSimpleCoroutine
{
29 const DoutPrefixProvider
*dpp
;
30 rgw::sal::RadosStore
* store
;
31 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
34 std::string
* last_trim_marker
;
37 DatalogTrimImplCR(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* store
, int shard
,
38 const std::string
& marker
, std::string
* last_trim_marker
)
39 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
), shard(shard
),
40 marker(marker
), last_trim_marker(last_trim_marker
) {
41 set_description() << "Datalog trim shard=" << shard
42 << " marker=" << marker
;
45 int send_request(const DoutPrefixProvider
*dpp
) override
{
46 set_status() << "sending request";
47 cn
= stack
->create_completion_notifier();
48 return store
->svc()->datalog_rados
->trim_entries(dpp
, shard
, marker
,
51 int request_complete() override
{
52 int r
= cn
->completion()->get_return_value();
53 ldpp_dout(dpp
, 20) << __PRETTY_FUNCTION__
<< "(): trim of shard=" << shard
54 << " marker=" << marker
<< " returned r=" << r
<< dendl
;
56 set_status() << "request complete; ret=" << r
;
60 // nothing left to trim, update last_trim_marker
61 if (*last_trim_marker
< marker
&&
62 marker
!= store
->svc()->datalog_rados
->max_marker()) {
63 *last_trim_marker
= marker
;
69 /// return the marker that it's safe to trim up to
70 const std::string
& get_stable_marker(const rgw_data_sync_marker
& m
)
72 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
75 /// populate the container starting with 'dest' with the minimum stable marker
76 /// of each shard for all of the peers in [first, last)
77 template <typename IterIn
, typename IterOut
>
78 void take_min_markers(IterIn first
, IterIn last
, IterOut dest
)
83 for (auto p
= first
; p
!= last
; ++p
) {
85 for (auto &shard
: p
->sync_markers
) {
86 const auto& stable
= get_stable_marker(shard
.second
);
95 } // anonymous namespace
97 class DataLogTrimCR
: public RGWCoroutine
{
98 using TrimCR
= DatalogTrimImplCR
;
99 const DoutPrefixProvider
*dpp
;
100 rgw::sal::RadosStore
* store
;
101 RGWHTTPManager
*http
;
102 const int num_shards
;
103 const std::string
& zone_id
; //< my zone id
104 std::vector
<rgw_data_sync_status
> peer_status
; //< sync status for each peer
105 std::vector
<std::string
> min_shard_markers
; //< min marker per shard
106 std::vector
<std::string
>& last_trim
; //< last trimmed marker per shard
110 DataLogTrimCR(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* store
, RGWHTTPManager
*http
,
111 int num_shards
, std::vector
<std::string
>& last_trim
)
112 : RGWCoroutine(store
->ctx()), dpp(dpp
), store(store
), http(http
),
113 num_shards(num_shards
),
114 zone_id(store
->svc()->zone
->get_zone().id
),
115 peer_status(store
->svc()->zone
->get_zone_data_notify_to_map().size()),
116 min_shard_markers(num_shards
,
117 std::string(store
->svc()->datalog_rados
->max_marker())),
121 int operate(const DoutPrefixProvider
*dpp
) override
;
124 int DataLogTrimCR::operate(const DoutPrefixProvider
*dpp
)
127 ldpp_dout(dpp
, 10) << "fetching sync status for zone " << zone_id
<< dendl
;
128 set_status("fetching sync status");
130 // query data sync status from each sync peer
131 rgw_http_param_pair params
[] = {
133 { "status", nullptr },
134 { "source-zone", zone_id
.c_str() },
138 auto p
= peer_status
.begin();
139 for (auto& c
: store
->svc()->zone
->get_zone_data_notify_to_map()) {
140 ldpp_dout(dpp
, 20) << "query sync status from " << c
.first
<< dendl
;
141 using StatusCR
= RGWReadRESTResourceCR
<rgw_data_sync_status
>;
142 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
148 // must get a successful reply from all peers to consider trimming
150 while (ret
== 0 && num_spawned() > 0) {
151 yield
wait_for_child();
157 ldpp_dout(dpp
, 4) << "failed to fetch sync status from all peers" << dendl
;
158 return set_cr_error(ret
);
161 ldpp_dout(dpp
, 10) << "trimming log shards" << dendl
;
162 set_status("trimming log shards");
164 // determine the minimum marker for each shard
165 take_min_markers(peer_status
.begin(), peer_status
.end(),
166 min_shard_markers
.begin());
168 for (int i
= 0; i
< num_shards
; i
++) {
169 const auto& m
= min_shard_markers
[i
];
170 if (m
<= last_trim
[i
]) {
173 ldpp_dout(dpp
, 10) << "trimming log shard " << i
174 << " at marker=" << m
175 << " last_trim=" << last_trim
[i
] << dendl
;
176 spawn(new TrimCR(dpp
, store
, i
, m
, &last_trim
[i
]),
180 return set_cr_done();
185 RGWCoroutine
* create_admin_data_log_trim_cr(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* store
,
186 RGWHTTPManager
*http
,
188 std::vector
<std::string
>& markers
)
190 return new DataLogTrimCR(dpp
, store
, http
, num_shards
, markers
);
193 class DataLogTrimPollCR
: public RGWCoroutine
{
194 const DoutPrefixProvider
*dpp
;
195 rgw::sal::RadosStore
* store
;
196 RGWHTTPManager
*http
;
197 const int num_shards
;
198 const utime_t interval
; //< polling interval
199 const std::string lock_oid
; //< use first data log shard for lock
200 const std::string lock_cookie
;
201 std::vector
<std::string
> last_trim
; //< last trimmed marker per shard
204 DataLogTrimPollCR(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* store
, RGWHTTPManager
*http
,
205 int num_shards
, utime_t interval
)
206 : RGWCoroutine(store
->ctx()), dpp(dpp
), store(store
), http(http
),
207 num_shards(num_shards
), interval(interval
),
208 lock_oid(store
->svc()->datalog_rados
->get_oid(0, 0)),
209 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
210 last_trim(num_shards
)
213 int operate(const DoutPrefixProvider
*dpp
) override
;
216 int DataLogTrimPollCR::operate(const DoutPrefixProvider
*dpp
)
220 set_status("sleeping");
223 // request a 'data_trim' lock that covers the entire wait interval to
224 // prevent other gateways from attempting to trim for the duration
225 set_status("acquiring trim lock");
226 yield
call(new RGWSimpleRadosLockCR(store
->svc()->rados
->get_async_processor(), store
,
227 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, lock_oid
),
228 "data_trim", lock_cookie
,
231 // if the lock is already held, go back to sleep and try again later
232 ldpp_dout(dpp
, 4) << "failed to lock " << lock_oid
<< ", trying again in "
233 << interval
.sec() << "s" << dendl
;
237 set_status("trimming");
238 yield
call(new DataLogTrimCR(dpp
, store
, http
, num_shards
, last_trim
));
240 // note that the lock is not released. this is intentional, as it avoids
241 // duplicating this work in other gateways
247 RGWCoroutine
* create_data_log_trim_cr(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* store
,
248 RGWHTTPManager
*http
,
249 int num_shards
, utime_t interval
)
251 return new DataLogTrimPollCR(dpp
, store
, http
, num_shards
, interval
);