]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include <vector> | |
5 | ||
6 | #include "common/debug.h" | |
7 | #include "common/containers.h" | |
8 | #include "common/errno.h" | |
9 | #include "common/error_code.h" | |
10 | ||
11 | #include "common/async/blocked_completion.h" | |
12 | #include "common/async/librados_completion.h" | |
13 | ||
14 | #include "cls/fifo/cls_fifo_types.h" | |
15 | #include "cls/log/cls_log_client.h" | |
16 | ||
17 | #include "cls_fifo_legacy.h" | |
18 | #include "rgw_datalog.h" | |
19 | #include "rgw_log_backing.h" | |
20 | #include "rgw_tools.h" | |
21 | ||
22 | #define dout_context g_ceph_context | |
23 | static constexpr auto dout_subsys = ceph_subsys_rgw; | |
24 | ||
25 | namespace bs = boost::system; | |
26 | namespace lr = librados; | |
27 | ||
28 | using ceph::containers::tiny_vector; | |
29 | ||
30 | void rgw_data_change::dump(ceph::Formatter *f) const | |
31 | { | |
32 | std::string type; | |
33 | switch (entity_type) { | |
34 | case ENTITY_TYPE_BUCKET: | |
35 | type = "bucket"; | |
36 | break; | |
37 | default: | |
38 | type = "unknown"; | |
39 | } | |
40 | encode_json("entity_type", type, f); | |
41 | encode_json("key", key, f); | |
42 | utime_t ut(timestamp); | |
43 | encode_json("timestamp", ut, f); | |
44 | } | |
45 | ||
46 | void rgw_data_change::decode_json(JSONObj *obj) { | |
47 | std::string s; | |
48 | JSONDecoder::decode_json("entity_type", s, obj); | |
49 | if (s == "bucket") { | |
50 | entity_type = ENTITY_TYPE_BUCKET; | |
51 | } else { | |
52 | entity_type = ENTITY_TYPE_UNKNOWN; | |
53 | } | |
54 | JSONDecoder::decode_json("key", key, obj); | |
55 | utime_t ut; | |
56 | JSONDecoder::decode_json("timestamp", ut, obj); | |
57 | timestamp = ut.to_real_time(); | |
58 | } | |
59 | ||
60 | void rgw_data_change_log_entry::dump(Formatter *f) const | |
61 | { | |
62 | encode_json("log_id", log_id, f); | |
63 | utime_t ut(log_timestamp); | |
64 | encode_json("log_timestamp", ut, f); | |
65 | encode_json("entry", entry, f); | |
66 | } | |
67 | ||
68 | void rgw_data_change_log_entry::decode_json(JSONObj *obj) { | |
69 | JSONDecoder::decode_json("log_id", log_id, obj); | |
70 | utime_t ut; | |
71 | JSONDecoder::decode_json("log_timestamp", ut, obj); | |
72 | log_timestamp = ut.to_real_time(); | |
73 | JSONDecoder::decode_json("entry", entry, obj); | |
74 | } | |
75 | ||
76 | class RGWDataChangesOmap final : public RGWDataChangesBE { | |
77 | using centries = std::list<cls_log_entry>; | |
78 | std::vector<std::string> oids; | |
79 | ||
80 | public: | |
81 | RGWDataChangesOmap(lr::IoCtx& ioctx, | |
82 | RGWDataChangesLog& datalog, | |
83 | uint64_t gen_id, | |
84 | int num_shards) | |
85 | : RGWDataChangesBE(ioctx, datalog, gen_id) { | |
86 | oids.reserve(num_shards); | |
87 | for (auto i = 0; i < num_shards; ++i) { | |
88 | oids.push_back(get_oid(i)); | |
89 | } | |
90 | } | |
91 | ~RGWDataChangesOmap() override = default; | |
b3b6e05e | 92 | |
f67539c2 TL |
93 | void prepare(ceph::real_time ut, const std::string& key, |
94 | ceph::buffer::list&& entry, entries& out) override { | |
95 | if (!std::holds_alternative<centries>(out)) { | |
96 | ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out)); | |
97 | out = centries(); | |
98 | } | |
99 | ||
100 | cls_log_entry e; | |
101 | cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry); | |
102 | std::get<centries>(out).push_back(std::move(e)); | |
103 | } | |
b3b6e05e | 104 | int push(const DoutPrefixProvider *dpp, int index, entries&& items) override { |
f67539c2 TL |
105 | lr::ObjectWriteOperation op; |
106 | cls_log_add(op, std::get<centries>(items), true); | |
b3b6e05e | 107 | auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield); |
f67539c2 | 108 | if (r < 0) { |
b3b6e05e | 109 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
110 | << ": failed to push to " << oids[index] << cpp_strerror(-r) |
111 | << dendl; | |
112 | } | |
113 | return r; | |
114 | } | |
b3b6e05e | 115 | int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now, |
f67539c2 TL |
116 | const std::string& key, |
117 | ceph::buffer::list&& bl) override { | |
118 | lr::ObjectWriteOperation op; | |
119 | cls_log_add(op, utime_t(now), {}, key, bl); | |
b3b6e05e | 120 | auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield); |
f67539c2 | 121 | if (r < 0) { |
b3b6e05e | 122 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
123 | << ": failed to push to " << oids[index] |
124 | << cpp_strerror(-r) << dendl; | |
125 | } | |
126 | return r; | |
127 | } | |
b3b6e05e | 128 | int list(const DoutPrefixProvider *dpp, int index, int max_entries, |
f67539c2 TL |
129 | std::vector<rgw_data_change_log_entry>& entries, |
130 | std::optional<std::string_view> marker, | |
131 | std::string* out_marker, bool* truncated) override { | |
132 | std::list<cls_log_entry> log_entries; | |
133 | lr::ObjectReadOperation op; | |
134 | cls_log_list(op, {}, {}, std::string(marker.value_or("")), | |
135 | max_entries, log_entries, out_marker, truncated); | |
b3b6e05e | 136 | auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield); |
f67539c2 TL |
137 | if (r == -ENOENT) { |
138 | *truncated = false; | |
139 | return 0; | |
140 | } | |
141 | if (r < 0) { | |
b3b6e05e | 142 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
143 | << ": failed to list " << oids[index] |
144 | << cpp_strerror(-r) << dendl; | |
145 | return r; | |
146 | } | |
147 | for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) { | |
148 | rgw_data_change_log_entry log_entry; | |
149 | log_entry.log_id = iter->id; | |
150 | auto rt = iter->timestamp.to_real_time(); | |
151 | log_entry.log_timestamp = rt; | |
152 | auto liter = iter->data.cbegin(); | |
153 | try { | |
154 | decode(log_entry.entry, liter); | |
155 | } catch (ceph::buffer::error& err) { | |
b3b6e05e | 156 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
157 | << ": failed to decode data changes log entry: " |
158 | << err.what() << dendl; | |
159 | return -EIO; | |
160 | } | |
161 | entries.push_back(log_entry); | |
162 | } | |
163 | return 0; | |
164 | } | |
b3b6e05e | 165 | int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override { |
f67539c2 TL |
166 | cls_log_header header; |
167 | lr::ObjectReadOperation op; | |
168 | cls_log_info(op, &header); | |
b3b6e05e | 169 | auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield); |
f67539c2 TL |
170 | if (r == -ENOENT) r = 0; |
171 | if (r < 0) { | |
b3b6e05e | 172 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
173 | << ": failed to get info from " << oids[index] |
174 | << cpp_strerror(-r) << dendl; | |
175 | } else { | |
176 | info->marker = header.max_marker; | |
177 | info->last_update = header.max_time.to_real_time(); | |
178 | } | |
179 | return r; | |
180 | } | |
b3b6e05e | 181 | int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override { |
f67539c2 TL |
182 | lr::ObjectWriteOperation op; |
183 | cls_log_trim(op, {}, {}, {}, std::string(marker)); | |
b3b6e05e | 184 | auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield); |
f67539c2 TL |
185 | if (r == -ENOENT) r = -ENODATA; |
186 | if (r < 0 && r != -ENODATA) { | |
b3b6e05e | 187 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
188 | << ": failed to get info from " << oids[index] |
189 | << cpp_strerror(-r) << dendl; | |
190 | } | |
191 | return r; | |
192 | } | |
b3b6e05e | 193 | int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, |
f67539c2 TL |
194 | lr::AioCompletion* c) override { |
195 | lr::ObjectWriteOperation op; | |
196 | cls_log_trim(op, {}, {}, {}, std::string(marker)); | |
197 | auto r = ioctx.aio_operate(oids[index], c, &op, 0); | |
198 | if (r == -ENOENT) r = -ENODATA; | |
199 | if (r < 0) { | |
20effc67 | 200 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
201 | << ": failed to get info from " << oids[index] |
202 | << cpp_strerror(-r) << dendl; | |
203 | } | |
204 | return r; | |
205 | } | |
206 | std::string_view max_marker() const override { | |
20effc67 | 207 | return "99999999"; |
f67539c2 | 208 | } |
b3b6e05e | 209 | int is_empty(const DoutPrefixProvider *dpp) override { |
f67539c2 TL |
210 | for (auto shard = 0u; shard < oids.size(); ++shard) { |
211 | std::list<cls_log_entry> log_entries; | |
212 | lr::ObjectReadOperation op; | |
213 | std::string out_marker; | |
214 | bool truncated; | |
215 | cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated); | |
b3b6e05e | 216 | auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield); |
f67539c2 TL |
217 | if (r == -ENOENT) { |
218 | continue; | |
219 | } | |
220 | if (r < 0) { | |
b3b6e05e | 221 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
222 | << ": failed to list " << oids[shard] |
223 | << cpp_strerror(-r) << dendl; | |
224 | return r; | |
225 | } | |
226 | if (!log_entries.empty()) { | |
227 | return 0; | |
228 | } | |
229 | } | |
230 | return 1; | |
231 | } | |
232 | }; | |
233 | ||
234 | class RGWDataChangesFIFO final : public RGWDataChangesBE { | |
235 | using centries = std::vector<ceph::buffer::list>; | |
236 | tiny_vector<LazyFIFO> fifos; | |
237 | ||
238 | public: | |
239 | RGWDataChangesFIFO(lr::IoCtx& ioctx, | |
240 | RGWDataChangesLog& datalog, | |
241 | uint64_t gen_id, int shards) | |
242 | : RGWDataChangesBE(ioctx, datalog, gen_id), | |
243 | fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) { | |
244 | emplacer.emplace(ioctx, get_oid(i)); | |
245 | }) {} | |
246 | ~RGWDataChangesFIFO() override = default; | |
247 | void prepare(ceph::real_time, const std::string&, | |
248 | ceph::buffer::list&& entry, entries& out) override { | |
249 | if (!std::holds_alternative<centries>(out)) { | |
250 | ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out)); | |
251 | out = centries(); | |
252 | } | |
253 | std::get<centries>(out).push_back(std::move(entry)); | |
254 | } | |
b3b6e05e TL |
255 | int push(const DoutPrefixProvider *dpp, int index, entries&& items) override { |
256 | auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield); | |
f67539c2 | 257 | if (r < 0) { |
b3b6e05e | 258 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
259 | << ": unable to push to FIFO: " << get_oid(index) |
260 | << ": " << cpp_strerror(-r) << dendl; | |
261 | } | |
262 | return r; | |
263 | } | |
b3b6e05e | 264 | int push(const DoutPrefixProvider *dpp, int index, ceph::real_time, |
f67539c2 TL |
265 | const std::string&, |
266 | ceph::buffer::list&& bl) override { | |
b3b6e05e | 267 | auto r = fifos[index].push(dpp, std::move(bl), null_yield); |
f67539c2 | 268 | if (r < 0) { |
b3b6e05e | 269 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
270 | << ": unable to push to FIFO: " << get_oid(index) |
271 | << ": " << cpp_strerror(-r) << dendl; | |
272 | } | |
273 | return r; | |
274 | } | |
b3b6e05e | 275 | int list(const DoutPrefixProvider *dpp, int index, int max_entries, |
f67539c2 TL |
276 | std::vector<rgw_data_change_log_entry>& entries, |
277 | std::optional<std::string_view> marker, | |
278 | std::string* out_marker, bool* truncated) override { | |
279 | std::vector<rgw::cls::fifo::list_entry> log_entries; | |
280 | bool more = false; | |
b3b6e05e | 281 | auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more, |
f67539c2 TL |
282 | null_yield); |
283 | if (r < 0) { | |
b3b6e05e | 284 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
285 | << ": unable to list FIFO: " << get_oid(index) |
286 | << ": " << cpp_strerror(-r) << dendl; | |
287 | return r; | |
288 | } | |
289 | for (const auto& entry : log_entries) { | |
290 | rgw_data_change_log_entry log_entry; | |
291 | log_entry.log_id = entry.marker; | |
292 | log_entry.log_timestamp = entry.mtime; | |
293 | auto liter = entry.data.cbegin(); | |
294 | try { | |
295 | decode(log_entry.entry, liter); | |
296 | } catch (const buffer::error& err) { | |
b3b6e05e | 297 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
298 | << ": failed to decode data changes log entry: " |
299 | << err.what() << dendl; | |
300 | return -EIO; | |
301 | } | |
302 | entries.push_back(std::move(log_entry)); | |
303 | } | |
304 | if (truncated) | |
305 | *truncated = more; | |
306 | if (out_marker && !log_entries.empty()) { | |
307 | *out_marker = log_entries.back().marker; | |
308 | } | |
309 | return 0; | |
310 | } | |
b3b6e05e | 311 | int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override { |
f67539c2 | 312 | auto& fifo = fifos[index]; |
b3b6e05e | 313 | auto r = fifo.read_meta(dpp, null_yield); |
f67539c2 | 314 | if (r < 0) { |
b3b6e05e | 315 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
316 | << ": unable to get FIFO metadata: " << get_oid(index) |
317 | << ": " << cpp_strerror(-r) << dendl; | |
318 | return r; | |
319 | } | |
320 | rados::cls::fifo::info m; | |
b3b6e05e | 321 | fifo.meta(dpp, m, null_yield); |
f67539c2 TL |
322 | auto p = m.head_part_num; |
323 | if (p < 0) { | |
20effc67 | 324 | info->marker = ""; |
f67539c2 TL |
325 | info->last_update = ceph::real_clock::zero(); |
326 | return 0; | |
327 | } | |
328 | rgw::cls::fifo::part_info h; | |
b3b6e05e | 329 | r = fifo.get_part_info(dpp, p, &h, null_yield); |
f67539c2 | 330 | if (r < 0) { |
b3b6e05e | 331 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
332 | << ": unable to get part info: " << get_oid(index) << "/" << p |
333 | << ": " << cpp_strerror(-r) << dendl; | |
334 | return r; | |
335 | } | |
336 | info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string(); | |
337 | info->last_update = h.max_time; | |
338 | return 0; | |
339 | } | |
b3b6e05e TL |
340 | int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override { |
341 | auto r = fifos[index].trim(dpp, marker, false, null_yield); | |
f67539c2 | 342 | if (r < 0) { |
b3b6e05e | 343 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
344 | << ": unable to trim FIFO: " << get_oid(index) |
345 | << ": " << cpp_strerror(-r) << dendl; | |
346 | } | |
347 | return r; | |
348 | } | |
b3b6e05e | 349 | int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker, |
f67539c2 TL |
350 | librados::AioCompletion* c) override { |
351 | int r = 0; | |
352 | if (marker == rgw::cls::fifo::marker(0, 0).to_string()) { | |
353 | rgw_complete_aio_completion(c, -ENODATA); | |
354 | } else { | |
b3b6e05e | 355 | fifos[index].trim(dpp, marker, false, c, null_yield); |
f67539c2 TL |
356 | } |
357 | return r; | |
358 | } | |
359 | std::string_view max_marker() const override { | |
360 | static const std::string mm = | |
361 | rgw::cls::fifo::marker::max().to_string(); | |
362 | return std::string_view(mm); | |
363 | } | |
b3b6e05e | 364 | int is_empty(const DoutPrefixProvider *dpp) override { |
f67539c2 TL |
365 | std::vector<rgw::cls::fifo::list_entry> log_entries; |
366 | bool more = false; | |
367 | for (auto shard = 0u; shard < fifos.size(); ++shard) { | |
b3b6e05e | 368 | auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more, |
f67539c2 TL |
369 | null_yield); |
370 | if (r < 0) { | |
b3b6e05e | 371 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
372 | << ": unable to list FIFO: " << get_oid(shard) |
373 | << ": " << cpp_strerror(-r) << dendl; | |
374 | return r; | |
375 | } | |
376 | if (!log_entries.empty()) { | |
377 | return 0; | |
378 | } | |
379 | } | |
380 | return 1; | |
381 | } | |
382 | }; | |
383 | ||
384 | RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) | |
385 | : cct(cct), | |
386 | num_shards(cct->_conf->rgw_data_log_num_shards), | |
387 | prefix(get_prefix()), | |
388 | changes(cct->_conf->rgw_data_log_changes_size) {} | |
389 | ||
390 | bs::error_code DataLogBackends::handle_init(entries_t e) noexcept { | |
391 | std::unique_lock l(m); | |
392 | ||
393 | for (const auto& [gen_id, gen] : e) { | |
394 | if (gen.pruned) { | |
395 | lderr(datalog.cct) | |
396 | << __PRETTY_FUNCTION__ << ":" << __LINE__ | |
397 | << ": ERROR: given empty generation: gen_id=" << gen_id << dendl; | |
398 | } | |
399 | if (count(gen_id) != 0) { | |
400 | lderr(datalog.cct) | |
401 | << __PRETTY_FUNCTION__ << ":" << __LINE__ | |
402 | << ": ERROR: generation already exists: gen_id=" << gen_id << dendl; | |
403 | } | |
404 | try { | |
405 | switch (gen.type) { | |
406 | case log_type::omap: | |
407 | emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards)); | |
408 | break; | |
409 | case log_type::fifo: | |
410 | emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards)); | |
411 | break; | |
412 | default: | |
413 | lderr(datalog.cct) | |
414 | << __PRETTY_FUNCTION__ << ":" << __LINE__ | |
415 | << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id | |
416 | << ", type" << gen.type << dendl; | |
417 | return bs::error_code(EFAULT, bs::system_category()); | |
418 | } | |
419 | } catch (const bs::system_error& err) { | |
420 | lderr(datalog.cct) | |
421 | << __PRETTY_FUNCTION__ << ":" << __LINE__ | |
422 | << ": error setting up backend: gen_id=" << gen_id | |
423 | << ", err=" << err.what() << dendl; | |
424 | return err.code(); | |
425 | } | |
426 | } | |
427 | return {}; | |
428 | } | |
429 | bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept { | |
430 | return handle_init(std::move(e)); | |
431 | } | |
432 | bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept { | |
433 | std::unique_lock l(m); | |
434 | auto i = cbegin(); | |
435 | if (i->first < new_tail) { | |
436 | return {}; | |
437 | } | |
438 | if (new_tail >= (cend() - 1)->first) { | |
439 | lderr(datalog.cct) | |
440 | << __PRETTY_FUNCTION__ << ":" << __LINE__ | |
441 | << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl; | |
442 | return bs::error_code(EFAULT, bs::system_category()); | |
443 | } | |
444 | erase(i, upper_bound(new_tail)); | |
445 | return {}; | |
446 | } | |
447 | ||
448 | ||
20effc67 | 449 | int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone, |
f67539c2 TL |
450 | const RGWZoneParams& zoneparams, |
451 | librados::Rados* lr) | |
452 | { | |
453 | zone = _zone; | |
454 | ceph_assert(zone); | |
455 | auto defbacking = to_log_type( | |
456 | cct->_conf.get_val<std::string>("rgw_default_data_log_backing")); | |
457 | // Should be guaranteed by `set_enum_allowed` | |
458 | ceph_assert(defbacking); | |
459 | auto log_pool = zoneparams.log_pool; | |
b3b6e05e | 460 | auto r = rgw_init_ioctx(dpp, lr, log_pool, ioctx, true, false); |
f67539c2 | 461 | if (r < 0) { |
b3b6e05e | 462 | ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ |
f67539c2 TL |
463 | << ": Failed to initialized ioctx, r=" << r |
464 | << ", pool=" << log_pool << dendl; | |
465 | return -r; | |
466 | } | |
467 | ||
468 | auto besr = logback_generations::init<DataLogBackends>( | |
b3b6e05e | 469 | dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) { |
f67539c2 TL |
470 | return get_oid(gen_id, shard); |
471 | }, | |
472 | num_shards, *defbacking, null_yield, *this); | |
473 | ||
474 | ||
475 | if (!besr) { | |
20effc67 | 476 | lderr(cct) << __PRETTY_FUNCTION__ |
f67539c2 TL |
477 | << ": Error initializing backends: " |
478 | << besr.error().message() << dendl; | |
479 | return ceph::from_error_code(besr.error()); | |
480 | } | |
481 | ||
482 | bes = std::move(*besr); | |
483 | renew_thread = make_named_thread("rgw_dt_lg_renew", | |
484 | &RGWDataChangesLog::renew_run, this); | |
485 | return 0; | |
486 | } | |
487 | ||
488 | int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { | |
489 | const auto& name = bs.bucket.name; | |
490 | auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); | |
491 | auto r = (ceph_str_hash_linux(name.data(), name.size()) + | |
492 | shard_shift) % num_shards; | |
493 | return static_cast<int>(r); | |
494 | } | |
495 | ||
b3b6e05e | 496 | int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) |
f67539c2 TL |
497 | { |
498 | if (!zone->log_data) | |
499 | return 0; | |
500 | ||
501 | /* we can't keep the bucket name as part of the cls_log_entry, and we need | |
502 | * it later, so we keep two lists under the map */ | |
503 | bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>, | |
504 | RGWDataChangesBE::entries>> m; | |
505 | ||
506 | std::unique_lock l(lock); | |
507 | decltype(cur_cycle) entries; | |
508 | entries.swap(cur_cycle); | |
509 | l.unlock(); | |
510 | ||
511 | auto ut = real_clock::now(); | |
512 | auto be = bes->head(); | |
513 | for (const auto& bs : entries) { | |
514 | auto index = choose_oid(bs); | |
515 | ||
516 | rgw_data_change change; | |
517 | bufferlist bl; | |
518 | change.entity_type = ENTITY_TYPE_BUCKET; | |
519 | change.key = bs.get_key(); | |
520 | change.timestamp = ut; | |
521 | encode(change, bl); | |
522 | ||
523 | m[index].first.push_back(bs); | |
524 | be->prepare(ut, change.key, std::move(bl), m[index].second); | |
525 | } | |
526 | ||
527 | for (auto& [index, p] : m) { | |
528 | auto& [buckets, entries] = p; | |
529 | ||
530 | auto now = real_clock::now(); | |
531 | ||
b3b6e05e | 532 | auto ret = be->push(dpp, index, std::move(entries)); |
f67539c2 TL |
533 | if (ret < 0) { |
534 | /* we don't really need to have a special handling for failed cases here, | |
535 | * as this is just an optimization. */ | |
b3b6e05e | 536 | ldpp_dout(dpp, -1) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl; |
f67539c2 TL |
537 | return ret; |
538 | } | |
539 | ||
540 | auto expiration = now; | |
541 | expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); | |
542 | for (auto& bs : buckets) { | |
543 | update_renewed(bs, expiration); | |
544 | } | |
545 | } | |
546 | ||
547 | return 0; | |
548 | } | |
549 | ||
550 | void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, | |
551 | ChangeStatusPtr& status) | |
552 | { | |
553 | ceph_assert(ceph_mutex_is_locked(lock)); | |
554 | if (!changes.find(bs, status)) { | |
555 | status = ChangeStatusPtr(new ChangeStatus); | |
556 | changes.add(bs, status); | |
557 | } | |
558 | } | |
559 | ||
560 | void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs) | |
561 | { | |
562 | std::scoped_lock l{lock}; | |
563 | cur_cycle.insert(bs); | |
564 | } | |
565 | ||
566 | void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs, | |
567 | real_time expiration) | |
568 | { | |
569 | std::scoped_lock l{lock}; | |
570 | ChangeStatusPtr status; | |
571 | _get_change(bs, status); | |
572 | ||
573 | ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" | |
574 | << bs.bucket.name << " shard_id=" << bs.shard_id | |
575 | << " expiration=" << expiration << dendl; | |
576 | status->cur_expiration = expiration; | |
577 | } | |
578 | ||
579 | int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { | |
580 | rgw_bucket_shard bs(bucket, shard_id); | |
581 | return choose_oid(bs); | |
582 | } | |
583 | ||
b3b6e05e TL |
584 | bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, |
585 | const rgw_bucket& bucket, | |
f67539c2 TL |
586 | optional_yield y) const |
587 | { | |
588 | if (!bucket_filter) { | |
589 | return true; | |
590 | } | |
591 | ||
b3b6e05e | 592 | return bucket_filter(bucket, y, dpp); |
f67539c2 TL |
593 | } |
594 | ||
595 | std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { | |
596 | return (gen_id > 0 ? | |
597 | fmt::format("{}@G{}.{}", prefix, gen_id, i) : | |
598 | fmt::format("{}.{}", prefix, i)); | |
599 | } | |
600 | ||
b3b6e05e | 601 | int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) { |
f67539c2 TL |
602 | auto& bucket = bucket_info.bucket; |
603 | ||
b3b6e05e | 604 | if (!filter_bucket(dpp, bucket, null_yield)) { |
f67539c2 TL |
605 | return 0; |
606 | } | |
607 | ||
608 | if (observer) { | |
609 | observer->on_bucket_changed(bucket.get_key()); | |
610 | } | |
611 | ||
612 | rgw_bucket_shard bs(bucket, shard_id); | |
613 | ||
614 | int index = choose_oid(bs); | |
615 | mark_modified(index, bs); | |
616 | ||
617 | std::unique_lock l(lock); | |
618 | ||
619 | ChangeStatusPtr status; | |
620 | _get_change(bs, status); | |
621 | l.unlock(); | |
622 | ||
623 | auto now = real_clock::now(); | |
624 | ||
625 | std::unique_lock sl(status->lock); | |
626 | ||
b3b6e05e | 627 | ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name |
f67539c2 TL |
628 | << " shard_id=" << shard_id << " now=" << now |
629 | << " cur_expiration=" << status->cur_expiration << dendl; | |
630 | ||
631 | if (now < status->cur_expiration) { | |
632 | /* no need to send, recently completed */ | |
633 | sl.unlock(); | |
634 | register_renew(bs); | |
635 | return 0; | |
636 | } | |
637 | ||
638 | RefCountedCond* cond; | |
639 | ||
640 | if (status->pending) { | |
641 | cond = status->cond; | |
642 | ||
643 | ceph_assert(cond); | |
644 | ||
645 | status->cond->get(); | |
646 | sl.unlock(); | |
647 | ||
648 | int ret = cond->wait(); | |
649 | cond->put(); | |
650 | if (!ret) { | |
651 | register_renew(bs); | |
652 | } | |
653 | return ret; | |
654 | } | |
655 | ||
656 | status->cond = new RefCountedCond; | |
657 | status->pending = true; | |
658 | ||
659 | ceph::real_time expiration; | |
660 | ||
661 | int ret; | |
662 | ||
663 | do { | |
664 | status->cur_sent = now; | |
665 | ||
666 | expiration = now; | |
667 | expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); | |
668 | ||
669 | sl.unlock(); | |
670 | ||
671 | ceph::buffer::list bl; | |
672 | rgw_data_change change; | |
673 | change.entity_type = ENTITY_TYPE_BUCKET; | |
674 | change.key = bs.get_key(); | |
675 | change.timestamp = now; | |
676 | encode(change, bl); | |
677 | ||
b3b6e05e | 678 | ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; |
f67539c2 TL |
679 | |
680 | auto be = bes->head(); | |
b3b6e05e | 681 | ret = be->push(dpp, index, now, change.key, std::move(bl)); |
f67539c2 TL |
682 | |
683 | now = real_clock::now(); | |
684 | ||
685 | sl.lock(); | |
686 | ||
687 | } while (!ret && real_clock::now() > expiration); | |
688 | ||
689 | cond = status->cond; | |
690 | ||
691 | status->pending = false; | |
692 | /* time of when operation started, not completed */ | |
693 | status->cur_expiration = status->cur_sent; | |
694 | status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window); | |
695 | status->cond = nullptr; | |
696 | sl.unlock(); | |
697 | ||
698 | cond->done(ret); | |
699 | cond->put(); | |
700 | ||
701 | return ret; | |
702 | } | |
703 | ||
b3b6e05e | 704 | int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 | 705 | std::vector<rgw_data_change_log_entry>& entries, |
522d829b TL |
706 | std::string_view marker, |
707 | std::string* out_marker, | |
708 | bool* truncated) | |
f67539c2 | 709 | { |
522d829b | 710 | const auto [start_id, start_cursor] = cursorgen(marker); |
f67539c2 TL |
711 | auto gen_id = start_id; |
712 | std::string out_cursor; | |
713 | while (max_entries > 0) { | |
714 | std::vector<rgw_data_change_log_entry> gentries; | |
715 | std::unique_lock l(m); | |
716 | auto i = lower_bound(gen_id); | |
717 | if (i == end()) return 0; | |
718 | auto be = i->second; | |
719 | l.unlock(); | |
720 | gen_id = be->gen_id; | |
b3b6e05e | 721 | auto r = be->list(dpp, shard, max_entries, gentries, |
f67539c2 TL |
722 | gen_id == start_id ? start_cursor : std::string{}, |
723 | &out_cursor, truncated); | |
724 | if (r < 0) | |
725 | return r; | |
726 | ||
727 | if (out_marker && !out_cursor.empty()) { | |
728 | *out_marker = gencursor(gen_id, out_cursor); | |
729 | } | |
730 | for (auto& g : gentries) { | |
731 | g.log_id = gencursor(gen_id, g.log_id); | |
732 | } | |
522d829b | 733 | if (int s = gentries.size(); s < 0 || s > max_entries) |
f67539c2 TL |
734 | max_entries = 0; |
735 | else | |
736 | max_entries -= gentries.size(); | |
737 | ||
738 | std::move(gentries.begin(), gentries.end(), | |
739 | std::back_inserter(entries)); | |
740 | ++gen_id; | |
741 | } | |
742 | return 0; | |
743 | } | |
744 | ||
b3b6e05e | 745 | int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, |
f67539c2 | 746 | std::vector<rgw_data_change_log_entry>& entries, |
522d829b | 747 | std::string_view marker, |
f67539c2 TL |
748 | std::string* out_marker, bool* truncated) |
749 | { | |
750 | assert(shard < num_shards); | |
b3b6e05e | 751 | return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated); |
f67539c2 TL |
752 | } |
753 | ||
b3b6e05e | 754 | int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries, |
f67539c2 TL |
755 | std::vector<rgw_data_change_log_entry>& entries, |
756 | LogMarker& marker, bool *ptruncated) | |
757 | { | |
758 | bool truncated; | |
759 | entries.clear(); | |
760 | for (; marker.shard < num_shards && int(entries.size()) < max_entries; | |
522d829b | 761 | marker.shard++, marker.marker.clear()) { |
b3b6e05e | 762 | int ret = list_entries(dpp, marker.shard, max_entries - entries.size(), |
f67539c2 TL |
763 | entries, marker.marker, NULL, &truncated); |
764 | if (ret == -ENOENT) { | |
765 | continue; | |
766 | } | |
767 | if (ret < 0) { | |
768 | return ret; | |
769 | } | |
770 | if (truncated) { | |
771 | *ptruncated = true; | |
772 | return 0; | |
773 | } | |
774 | } | |
775 | *ptruncated = (marker.shard < num_shards); | |
776 | return 0; | |
777 | } | |
778 | ||
b3b6e05e | 779 | int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info) |
f67539c2 TL |
780 | { |
781 | assert(shard_id < num_shards); | |
782 | auto be = bes->head(); | |
b3b6e05e | 783 | auto r = be->get_info(dpp, shard_id, info); |
f67539c2 TL |
784 | if (!info->marker.empty()) { |
785 | info->marker = gencursor(be->gen_id, info->marker); | |
786 | } | |
787 | return r; | |
788 | } | |
789 | ||
b3b6e05e | 790 | int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker) |
f67539c2 TL |
791 | { |
792 | auto [target_gen, cursor] = cursorgen(marker); | |
793 | std::unique_lock l(m); | |
794 | const auto head_gen = (end() - 1)->second->gen_id; | |
795 | const auto tail_gen = begin()->first; | |
796 | if (target_gen < tail_gen) return 0; | |
797 | auto r = 0; | |
798 | for (auto be = lower_bound(0)->second; | |
799 | be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0; | |
800 | be = upper_bound(be->gen_id)->second) { | |
801 | l.unlock(); | |
802 | auto c = be->gen_id == target_gen ? cursor : be->max_marker(); | |
b3b6e05e | 803 | r = be->trim(dpp, shard_id, c); |
f67539c2 TL |
804 | if (r == -ENOENT) |
805 | r = -ENODATA; | |
806 | if (r == -ENODATA && be->gen_id < target_gen) | |
807 | r = 0; | |
522d829b TL |
808 | if (be->gen_id == target_gen) |
809 | break; | |
f67539c2 TL |
810 | l.lock(); |
811 | }; | |
812 | return r; | |
813 | } | |
814 | ||
b3b6e05e | 815 | int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker) |
f67539c2 TL |
816 | { |
817 | assert(shard_id < num_shards); | |
b3b6e05e | 818 | return bes->trim_entries(dpp, shard_id, marker); |
f67539c2 TL |
819 | } |
820 | ||
821 | class GenTrim : public rgw::cls::fifo::Completion<GenTrim> { | |
822 | public: | |
823 | DataLogBackends* const bes; | |
824 | const int shard_id; | |
825 | const uint64_t target_gen; | |
826 | const std::string cursor; | |
827 | const uint64_t head_gen; | |
828 | const uint64_t tail_gen; | |
829 | boost::intrusive_ptr<RGWDataChangesBE> be; | |
830 | ||
b3b6e05e | 831 | GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen, |
f67539c2 TL |
832 | std::string cursor, uint64_t head_gen, uint64_t tail_gen, |
833 | boost::intrusive_ptr<RGWDataChangesBE> be, | |
834 | lr::AioCompletion* super) | |
b3b6e05e | 835 | : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen), |
f67539c2 TL |
836 | cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen), |
837 | be(std::move(be)) {} | |
838 | ||
b3b6e05e | 839 | void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) { |
f67539c2 TL |
840 | auto gen_id = be->gen_id; |
841 | be.reset(); | |
842 | if (r == -ENOENT) | |
843 | r = -ENODATA; | |
844 | if (r == -ENODATA && gen_id < target_gen) | |
845 | r = 0; | |
846 | if (r < 0) { | |
847 | complete(std::move(p), r); | |
848 | return; | |
849 | } | |
850 | ||
851 | { | |
852 | std::unique_lock l(bes->m); | |
853 | auto i = bes->upper_bound(gen_id); | |
854 | if (i == bes->end() || i->first > target_gen || i->first > head_gen) { | |
855 | l.unlock(); | |
856 | complete(std::move(p), -ENODATA); | |
857 | return; | |
858 | } | |
859 | be = i->second; | |
860 | } | |
861 | auto c = be->gen_id == target_gen ? cursor : be->max_marker(); | |
b3b6e05e | 862 | be->trim(dpp, shard_id, c, call(std::move(p))); |
f67539c2 TL |
863 | } |
864 | }; | |
865 | ||
b3b6e05e | 866 | void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, |
f67539c2 TL |
867 | librados::AioCompletion* c) |
868 | { | |
869 | auto [target_gen, cursor] = cursorgen(marker); | |
870 | std::unique_lock l(m); | |
871 | const auto head_gen = (end() - 1)->second->gen_id; | |
872 | const auto tail_gen = begin()->first; | |
873 | if (target_gen < tail_gen) { | |
874 | l.unlock(); | |
875 | rgw_complete_aio_completion(c, -ENODATA); | |
876 | return; | |
877 | } | |
878 | auto be = begin()->second; | |
879 | l.unlock(); | |
b3b6e05e | 880 | auto gt = std::make_unique<GenTrim>(dpp, this, shard_id, target_gen, |
f67539c2 TL |
881 | std::string(cursor), head_gen, tail_gen, |
882 | be, c); | |
883 | ||
884 | auto cc = be->gen_id == target_gen ? cursor : be->max_marker(); | |
b3b6e05e | 885 | be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt))); |
f67539c2 TL |
886 | } |
887 | ||
b3b6e05e | 888 | int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) { |
f67539c2 TL |
889 | if (size() != 1) { |
890 | std::vector<mapped_type> candidates; | |
891 | { | |
892 | std::scoped_lock l(m); | |
893 | auto e = cend() - 1; | |
894 | for (auto i = cbegin(); i < e; ++i) { | |
895 | candidates.push_back(i->second); | |
896 | } | |
897 | } | |
898 | ||
899 | std::optional<uint64_t> highest; | |
900 | for (auto& be : candidates) { | |
b3b6e05e | 901 | auto r = be->is_empty(dpp); |
f67539c2 TL |
902 | if (r < 0) { |
903 | return r; | |
904 | } else if (r == 1) { | |
905 | highest = be->gen_id; | |
906 | } else { | |
907 | break; | |
908 | } | |
909 | } | |
910 | ||
911 | through = highest; | |
912 | if (!highest) { | |
913 | return 0; | |
914 | } | |
b3b6e05e | 915 | auto ec = empty_to(dpp, *highest, null_yield); |
f67539c2 TL |
916 | if (ec) { |
917 | return ceph::from_error_code(ec); | |
918 | } | |
919 | } | |
920 | ||
b3b6e05e | 921 | return ceph::from_error_code(remove_empty(dpp, null_yield)); |
f67539c2 TL |
922 | } |
923 | ||
924 | ||
b3b6e05e | 925 | int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker, |
f67539c2 TL |
926 | librados::AioCompletion* c) |
927 | { | |
928 | assert(shard_id < num_shards); | |
b3b6e05e | 929 | bes->trim_entries(dpp, shard_id, marker, c); |
f67539c2 TL |
930 | return 0; |
931 | } | |
932 | ||
933 | bool RGWDataChangesLog::going_down() const | |
934 | { | |
935 | return down_flag; | |
936 | } | |
937 | ||
938 | RGWDataChangesLog::~RGWDataChangesLog() { | |
939 | down_flag = true; | |
940 | if (renew_thread.joinable()) { | |
941 | renew_stop(); | |
942 | renew_thread.join(); | |
943 | } | |
944 | } | |
945 | ||
20effc67 | 946 | void RGWDataChangesLog::renew_run() noexcept { |
f67539c2 TL |
947 | static constexpr auto runs_per_prune = 150; |
948 | auto run = 0; | |
949 | for (;;) { | |
b3b6e05e TL |
950 | const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); |
951 | ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl; | |
952 | int r = renew_entries(&dp); | |
f67539c2 | 953 | if (r < 0) { |
b3b6e05e | 954 | ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl; |
f67539c2 TL |
955 | } |
956 | ||
957 | if (going_down()) | |
958 | break; | |
959 | ||
960 | if (run == runs_per_prune) { | |
961 | std::optional<uint64_t> through; | |
b3b6e05e TL |
962 | ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl; |
963 | trim_generations(&dp, through); | |
f67539c2 TL |
964 | if (r < 0) { |
965 | derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r=" | |
966 | << r << dendl; | |
967 | } else if (through) { | |
b3b6e05e | 968 | ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations " |
f67539c2 TL |
969 | << "through " << *through << "." << dendl; |
970 | } else { | |
b3b6e05e | 971 | ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune." |
f67539c2 TL |
972 | << dendl; |
973 | } | |
974 | run = 0; | |
975 | } else { | |
976 | ++run; | |
977 | } | |
978 | ||
979 | int interval = cct->_conf->rgw_data_log_window * 3 / 4; | |
980 | std::unique_lock locker{renew_lock}; | |
981 | renew_cond.wait_for(locker, std::chrono::seconds(interval)); | |
982 | } | |
983 | } | |
984 | ||
985 | void RGWDataChangesLog::renew_stop() | |
986 | { | |
987 | std::lock_guard l{renew_lock}; | |
988 | renew_cond.notify_all(); | |
989 | } | |
990 | ||
991 | void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs) | |
992 | { | |
20effc67 TL |
993 | if (!cct->_conf->rgw_data_notify_interval_msec) { |
994 | return; | |
995 | } | |
996 | ||
f67539c2 TL |
997 | auto key = bs.get_key(); |
998 | { | |
999 | std::shared_lock rl{modified_lock}; // read lock to check for existence | |
1000 | auto shard = modified_shards.find(shard_id); | |
1001 | if (shard != modified_shards.end() && shard->second.count(key)) { | |
1002 | return; | |
1003 | } | |
1004 | } | |
1005 | ||
1006 | std::unique_lock wl{modified_lock}; // write lock for insertion | |
1007 | modified_shards[shard_id].insert(key); | |
1008 | } | |
1009 | ||
1010 | std::string RGWDataChangesLog::max_marker() const { | |
1011 | return gencursor(std::numeric_limits<uint64_t>::max(), | |
1012 | "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); | |
1013 | } | |
1014 | ||
b3b6e05e TL |
1015 | int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) { |
1016 | return ceph::from_error_code(bes->new_backing(dpp, type, y)); | |
f67539c2 TL |
1017 | } |
1018 | ||
b3b6e05e TL |
1019 | int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) { |
1020 | return bes->trim_generations(dpp, through); | |
f67539c2 | 1021 | } |
20effc67 TL |
1022 | |
1023 | void RGWDataChangesLogInfo::dump(Formatter *f) const | |
1024 | { | |
1025 | encode_json("marker", marker, f); | |
1026 | utime_t ut(last_update); | |
1027 | encode_json("last_update", ut, f); | |
1028 | } | |
1029 | ||
1030 | void RGWDataChangesLogInfo::decode_json(JSONObj *obj) | |
1031 | { | |
1032 | JSONDecoder::decode_json("marker", marker, obj); | |
1033 | utime_t ut; | |
1034 | JSONDecoder::decode_json("last_update", ut, obj); | |
1035 | last_update = ut.to_real_time(); | |
1036 | } | |
1037 | ||
1038 |