]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_trim_datalog.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_trim_datalog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <vector>
5 #include <string>
6
7 #include "common/errno.h"
8
9 #include "rgw_trim_datalog.h"
10 #include "rgw_cr_rados.h"
11 #include "rgw_cr_rest.h"
12 #include "rgw_datalog.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_zone.h"
15 #include "rgw_bucket.h"
16
17 #include "services/svc_zone.h"
18
19 #include <boost/asio/yield.hpp>
20
21 #define dout_subsys ceph_subsys_rgw
22
23 #undef dout_prefix
24 #define dout_prefix (*_dout << "data trim: ")
25
26 namespace {
27
28 class DatalogTrimImplCR : public RGWSimpleCoroutine {
29 const DoutPrefixProvider *dpp;
30 rgw::sal::RadosStore* store;
31 boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
32 int shard;
33 std::string marker;
34 std::string* last_trim_marker;
35
36 public:
37 DatalogTrimImplCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, int shard,
38 const std::string& marker, std::string* last_trim_marker)
39 : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), shard(shard),
40 marker(marker), last_trim_marker(last_trim_marker) {
41 set_description() << "Datalog trim shard=" << shard
42 << " marker=" << marker;
43 }
44
45 int send_request(const DoutPrefixProvider *dpp) override {
46 set_status() << "sending request";
47 cn = stack->create_completion_notifier();
48 return store->svc()->datalog_rados->trim_entries(dpp, shard, marker,
49 cn->completion());
50 }
51 int request_complete() override {
52 int r = cn->completion()->get_return_value();
53 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "(): trim of shard=" << shard
54 << " marker=" << marker << " returned r=" << r << dendl;
55
56 set_status() << "request complete; ret=" << r;
57 if (r != -ENODATA) {
58 return r;
59 }
60 // nothing left to trim, update last_trim_marker
61 if (*last_trim_marker < marker &&
62 marker != store->svc()->datalog_rados->max_marker()) {
63 *last_trim_marker = marker;
64 }
65 return 0;
66 }
67 };
68
69 /// return the marker that it's safe to trim up to
70 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
71 {
72 return m.state == m.FullSync ? m.next_step_marker : m.marker;
73 }
74
75 /// populate the container starting with 'dest' with the minimum stable marker
76 /// of each shard for all of the peers in [first, last)
77 template <typename IterIn, typename IterOut>
78 void take_min_markers(IterIn first, IterIn last, IterOut dest)
79 {
80 if (first == last) {
81 return;
82 }
83 for (auto p = first; p != last; ++p) {
84 auto m = dest;
85 for (auto &shard : p->sync_markers) {
86 const auto& stable = get_stable_marker(shard.second);
87 if (*m > stable) {
88 *m = stable;
89 }
90 ++m;
91 }
92 }
93 }
94
95 } // anonymous namespace
96
97 class DataLogTrimCR : public RGWCoroutine {
98 using TrimCR = DatalogTrimImplCR;
99 const DoutPrefixProvider *dpp;
100 rgw::sal::RadosStore* store;
101 RGWHTTPManager *http;
102 const int num_shards;
103 const std::string& zone_id; //< my zone id
104 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
105 std::vector<std::string> min_shard_markers; //< min marker per shard
106 std::vector<std::string>& last_trim; //< last trimmed marker per shard
107 int ret{0};
108
109 public:
110 DataLogTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, RGWHTTPManager *http,
111 int num_shards, std::vector<std::string>& last_trim)
112 : RGWCoroutine(store->ctx()), dpp(dpp), store(store), http(http),
113 num_shards(num_shards),
114 zone_id(store->svc()->zone->get_zone().id),
115 peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()),
116 min_shard_markers(num_shards,
117 std::string(store->svc()->datalog_rados->max_marker())),
118 last_trim(last_trim)
119 {}
120
121 int operate(const DoutPrefixProvider *dpp) override;
122 };
123
124 int DataLogTrimCR::operate(const DoutPrefixProvider *dpp)
125 {
126 reenter(this) {
127 ldpp_dout(dpp, 10) << "fetching sync status for zone " << zone_id << dendl;
128 set_status("fetching sync status");
129 yield {
130 // query data sync status from each sync peer
131 rgw_http_param_pair params[] = {
132 { "type", "data" },
133 { "status", nullptr },
134 { "source-zone", zone_id.c_str() },
135 { nullptr, nullptr }
136 };
137
138 auto p = peer_status.begin();
139 for (auto& c : store->svc()->zone->get_zone_data_notify_to_map()) {
140 ldpp_dout(dpp, 20) << "query sync status from " << c.first << dendl;
141 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
142 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
143 false);
144 ++p;
145 }
146 }
147
148 // must get a successful reply from all peers to consider trimming
149 ret = 0;
150 while (ret == 0 && num_spawned() > 0) {
151 yield wait_for_child();
152 collect_next(&ret);
153 }
154 drain_all();
155
156 if (ret < 0) {
157 ldpp_dout(dpp, 4) << "failed to fetch sync status from all peers" << dendl;
158 return set_cr_error(ret);
159 }
160
161 ldpp_dout(dpp, 10) << "trimming log shards" << dendl;
162 set_status("trimming log shards");
163 yield {
164 // determine the minimum marker for each shard
165 take_min_markers(peer_status.begin(), peer_status.end(),
166 min_shard_markers.begin());
167
168 for (int i = 0; i < num_shards; i++) {
169 const auto& m = min_shard_markers[i];
170 if (m <= last_trim[i]) {
171 continue;
172 }
173 ldpp_dout(dpp, 10) << "trimming log shard " << i
174 << " at marker=" << m
175 << " last_trim=" << last_trim[i] << dendl;
176 spawn(new TrimCR(dpp, store, i, m, &last_trim[i]),
177 true);
178 }
179 }
180 return set_cr_done();
181 }
182 return 0;
183 }
184
185 RGWCoroutine* create_admin_data_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store,
186 RGWHTTPManager *http,
187 int num_shards,
188 std::vector<std::string>& markers)
189 {
190 return new DataLogTrimCR(dpp, store, http, num_shards, markers);
191 }
192
193 class DataLogTrimPollCR : public RGWCoroutine {
194 const DoutPrefixProvider *dpp;
195 rgw::sal::RadosStore* store;
196 RGWHTTPManager *http;
197 const int num_shards;
198 const utime_t interval; //< polling interval
199 const std::string lock_oid; //< use first data log shard for lock
200 const std::string lock_cookie;
201 std::vector<std::string> last_trim; //< last trimmed marker per shard
202
203 public:
204 DataLogTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store, RGWHTTPManager *http,
205 int num_shards, utime_t interval)
206 : RGWCoroutine(store->ctx()), dpp(dpp), store(store), http(http),
207 num_shards(num_shards), interval(interval),
208 lock_oid(store->svc()->datalog_rados->get_oid(0, 0)),
209 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
210 last_trim(num_shards)
211 {}
212
213 int operate(const DoutPrefixProvider *dpp) override;
214 };
215
216 int DataLogTrimPollCR::operate(const DoutPrefixProvider *dpp)
217 {
218 reenter(this) {
219 for (;;) {
220 set_status("sleeping");
221 wait(interval);
222
223 // request a 'data_trim' lock that covers the entire wait interval to
224 // prevent other gateways from attempting to trim for the duration
225 set_status("acquiring trim lock");
226 yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
227 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, lock_oid),
228 "data_trim", lock_cookie,
229 interval.sec()));
230 if (retcode < 0) {
231 // if the lock is already held, go back to sleep and try again later
232 ldpp_dout(dpp, 4) << "failed to lock " << lock_oid << ", trying again in "
233 << interval.sec() << "s" << dendl;
234 continue;
235 }
236
237 set_status("trimming");
238 yield call(new DataLogTrimCR(dpp, store, http, num_shards, last_trim));
239
240 // note that the lock is not released. this is intentional, as it avoids
241 // duplicating this work in other gateways
242 }
243 }
244 return 0;
245 }
246
247 RGWCoroutine* create_data_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* store,
248 RGWHTTPManager *http,
249 int num_shards, utime_t interval)
250 {
251 return new DataLogTrimPollCR(dpp, store, http, num_shards, interval);
252 }