]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_datalog.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_datalog.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include <vector>
5
6#include "common/debug.h"
7#include "common/containers.h"
8#include "common/errno.h"
9#include "common/error_code.h"
10
11#include "common/async/blocked_completion.h"
12#include "common/async/librados_completion.h"
13
14#include "cls/fifo/cls_fifo_types.h"
15#include "cls/log/cls_log_client.h"
16
17#include "cls_fifo_legacy.h"
18#include "rgw_datalog.h"
19#include "rgw_log_backing.h"
20#include "rgw_tools.h"
21
22#define dout_context g_ceph_context
23static constexpr auto dout_subsys = ceph_subsys_rgw;
24
25namespace bs = boost::system;
26namespace lr = librados;
27
28using ceph::containers::tiny_vector;
29
30void rgw_data_change::dump(ceph::Formatter *f) const
31{
32 std::string type;
33 switch (entity_type) {
34 case ENTITY_TYPE_BUCKET:
35 type = "bucket";
36 break;
37 default:
38 type = "unknown";
39 }
40 encode_json("entity_type", type, f);
41 encode_json("key", key, f);
42 utime_t ut(timestamp);
43 encode_json("timestamp", ut, f);
44}
45
46void rgw_data_change::decode_json(JSONObj *obj) {
47 std::string s;
48 JSONDecoder::decode_json("entity_type", s, obj);
49 if (s == "bucket") {
50 entity_type = ENTITY_TYPE_BUCKET;
51 } else {
52 entity_type = ENTITY_TYPE_UNKNOWN;
53 }
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58}
59
60void rgw_data_change_log_entry::dump(Formatter *f) const
61{
62 encode_json("log_id", log_id, f);
63 utime_t ut(log_timestamp);
64 encode_json("log_timestamp", ut, f);
65 encode_json("entry", entry, f);
66}
67
68void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
69 JSONDecoder::decode_json("log_id", log_id, obj);
70 utime_t ut;
71 JSONDecoder::decode_json("log_timestamp", ut, obj);
72 log_timestamp = ut.to_real_time();
73 JSONDecoder::decode_json("entry", entry, obj);
74}
75
76class RGWDataChangesOmap final : public RGWDataChangesBE {
77 using centries = std::list<cls_log_entry>;
78 std::vector<std::string> oids;
79
80public:
81 RGWDataChangesOmap(lr::IoCtx& ioctx,
82 RGWDataChangesLog& datalog,
83 uint64_t gen_id,
84 int num_shards)
85 : RGWDataChangesBE(ioctx, datalog, gen_id) {
86 oids.reserve(num_shards);
87 for (auto i = 0; i < num_shards; ++i) {
88 oids.push_back(get_oid(i));
89 }
90 }
91 ~RGWDataChangesOmap() override = default;
b3b6e05e 92
f67539c2
TL
93 void prepare(ceph::real_time ut, const std::string& key,
94 ceph::buffer::list&& entry, entries& out) override {
95 if (!std::holds_alternative<centries>(out)) {
96 ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out));
97 out = centries();
98 }
99
100 cls_log_entry e;
101 cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
102 std::get<centries>(out).push_back(std::move(e));
103 }
b3b6e05e 104 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
f67539c2
TL
105 lr::ObjectWriteOperation op;
106 cls_log_add(op, std::get<centries>(items), true);
b3b6e05e 107 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2 108 if (r < 0) {
b3b6e05e 109 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
110 << ": failed to push to " << oids[index] << cpp_strerror(-r)
111 << dendl;
112 }
113 return r;
114 }
b3b6e05e 115 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
f67539c2
TL
116 const std::string& key,
117 ceph::buffer::list&& bl) override {
118 lr::ObjectWriteOperation op;
119 cls_log_add(op, utime_t(now), {}, key, bl);
b3b6e05e 120 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2 121 if (r < 0) {
b3b6e05e 122 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
123 << ": failed to push to " << oids[index]
124 << cpp_strerror(-r) << dendl;
125 }
126 return r;
127 }
b3b6e05e 128 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
f67539c2
TL
129 std::vector<rgw_data_change_log_entry>& entries,
130 std::optional<std::string_view> marker,
131 std::string* out_marker, bool* truncated) override {
132 std::list<cls_log_entry> log_entries;
133 lr::ObjectReadOperation op;
134 cls_log_list(op, {}, {}, std::string(marker.value_or("")),
135 max_entries, log_entries, out_marker, truncated);
b3b6e05e 136 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
f67539c2
TL
137 if (r == -ENOENT) {
138 *truncated = false;
139 return 0;
140 }
141 if (r < 0) {
b3b6e05e 142 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
143 << ": failed to list " << oids[index]
144 << cpp_strerror(-r) << dendl;
145 return r;
146 }
147 for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
148 rgw_data_change_log_entry log_entry;
149 log_entry.log_id = iter->id;
150 auto rt = iter->timestamp.to_real_time();
151 log_entry.log_timestamp = rt;
152 auto liter = iter->data.cbegin();
153 try {
154 decode(log_entry.entry, liter);
155 } catch (ceph::buffer::error& err) {
b3b6e05e 156 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
157 << ": failed to decode data changes log entry: "
158 << err.what() << dendl;
159 return -EIO;
160 }
161 entries.push_back(log_entry);
162 }
163 return 0;
164 }
b3b6e05e 165 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
f67539c2
TL
166 cls_log_header header;
167 lr::ObjectReadOperation op;
168 cls_log_info(op, &header);
b3b6e05e 169 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
f67539c2
TL
170 if (r == -ENOENT) r = 0;
171 if (r < 0) {
b3b6e05e 172 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
173 << ": failed to get info from " << oids[index]
174 << cpp_strerror(-r) << dendl;
175 } else {
176 info->marker = header.max_marker;
177 info->last_update = header.max_time.to_real_time();
178 }
179 return r;
180 }
b3b6e05e 181 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
f67539c2
TL
182 lr::ObjectWriteOperation op;
183 cls_log_trim(op, {}, {}, {}, std::string(marker));
b3b6e05e 184 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2
TL
185 if (r == -ENOENT) r = -ENODATA;
186 if (r < 0 && r != -ENODATA) {
b3b6e05e 187 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
188 << ": failed to get info from " << oids[index]
189 << cpp_strerror(-r) << dendl;
190 }
191 return r;
192 }
b3b6e05e 193 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
194 lr::AioCompletion* c) override {
195 lr::ObjectWriteOperation op;
196 cls_log_trim(op, {}, {}, {}, std::string(marker));
197 auto r = ioctx.aio_operate(oids[index], c, &op, 0);
198 if (r == -ENOENT) r = -ENODATA;
199 if (r < 0) {
200 lderr(cct) << __PRETTY_FUNCTION__
201 << ": failed to get info from " << oids[index]
202 << cpp_strerror(-r) << dendl;
203 }
204 return r;
205 }
206 std::string_view max_marker() const override {
207 return "99999999"sv;
208 }
b3b6e05e 209 int is_empty(const DoutPrefixProvider *dpp) override {
f67539c2
TL
210 for (auto shard = 0u; shard < oids.size(); ++shard) {
211 std::list<cls_log_entry> log_entries;
212 lr::ObjectReadOperation op;
213 std::string out_marker;
214 bool truncated;
215 cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated);
b3b6e05e 216 auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield);
f67539c2
TL
217 if (r == -ENOENT) {
218 continue;
219 }
220 if (r < 0) {
b3b6e05e 221 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
222 << ": failed to list " << oids[shard]
223 << cpp_strerror(-r) << dendl;
224 return r;
225 }
226 if (!log_entries.empty()) {
227 return 0;
228 }
229 }
230 return 1;
231 }
232};
233
234class RGWDataChangesFIFO final : public RGWDataChangesBE {
235 using centries = std::vector<ceph::buffer::list>;
236 tiny_vector<LazyFIFO> fifos;
237
238public:
239 RGWDataChangesFIFO(lr::IoCtx& ioctx,
240 RGWDataChangesLog& datalog,
241 uint64_t gen_id, int shards)
242 : RGWDataChangesBE(ioctx, datalog, gen_id),
243 fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) {
244 emplacer.emplace(ioctx, get_oid(i));
245 }) {}
246 ~RGWDataChangesFIFO() override = default;
247 void prepare(ceph::real_time, const std::string&,
248 ceph::buffer::list&& entry, entries& out) override {
249 if (!std::holds_alternative<centries>(out)) {
250 ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out));
251 out = centries();
252 }
253 std::get<centries>(out).push_back(std::move(entry));
254 }
b3b6e05e
TL
255 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
256 auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield);
f67539c2 257 if (r < 0) {
b3b6e05e 258 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
259 << ": unable to push to FIFO: " << get_oid(index)
260 << ": " << cpp_strerror(-r) << dendl;
261 }
262 return r;
263 }
b3b6e05e 264 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time,
f67539c2
TL
265 const std::string&,
266 ceph::buffer::list&& bl) override {
b3b6e05e 267 auto r = fifos[index].push(dpp, std::move(bl), null_yield);
f67539c2 268 if (r < 0) {
b3b6e05e 269 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
270 << ": unable to push to FIFO: " << get_oid(index)
271 << ": " << cpp_strerror(-r) << dendl;
272 }
273 return r;
274 }
b3b6e05e 275 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
f67539c2
TL
276 std::vector<rgw_data_change_log_entry>& entries,
277 std::optional<std::string_view> marker,
278 std::string* out_marker, bool* truncated) override {
279 std::vector<rgw::cls::fifo::list_entry> log_entries;
280 bool more = false;
b3b6e05e 281 auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more,
f67539c2
TL
282 null_yield);
283 if (r < 0) {
b3b6e05e 284 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
285 << ": unable to list FIFO: " << get_oid(index)
286 << ": " << cpp_strerror(-r) << dendl;
287 return r;
288 }
289 for (const auto& entry : log_entries) {
290 rgw_data_change_log_entry log_entry;
291 log_entry.log_id = entry.marker;
292 log_entry.log_timestamp = entry.mtime;
293 auto liter = entry.data.cbegin();
294 try {
295 decode(log_entry.entry, liter);
296 } catch (const buffer::error& err) {
b3b6e05e 297 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
298 << ": failed to decode data changes log entry: "
299 << err.what() << dendl;
300 return -EIO;
301 }
302 entries.push_back(std::move(log_entry));
303 }
304 if (truncated)
305 *truncated = more;
306 if (out_marker && !log_entries.empty()) {
307 *out_marker = log_entries.back().marker;
308 }
309 return 0;
310 }
b3b6e05e 311 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
f67539c2 312 auto& fifo = fifos[index];
b3b6e05e 313 auto r = fifo.read_meta(dpp, null_yield);
f67539c2 314 if (r < 0) {
b3b6e05e 315 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
316 << ": unable to get FIFO metadata: " << get_oid(index)
317 << ": " << cpp_strerror(-r) << dendl;
318 return r;
319 }
320 rados::cls::fifo::info m;
b3b6e05e 321 fifo.meta(dpp, m, null_yield);
f67539c2
TL
322 auto p = m.head_part_num;
323 if (p < 0) {
324 info->marker = ""s;
325 info->last_update = ceph::real_clock::zero();
326 return 0;
327 }
328 rgw::cls::fifo::part_info h;
b3b6e05e 329 r = fifo.get_part_info(dpp, p, &h, null_yield);
f67539c2 330 if (r < 0) {
b3b6e05e 331 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
332 << ": unable to get part info: " << get_oid(index) << "/" << p
333 << ": " << cpp_strerror(-r) << dendl;
334 return r;
335 }
336 info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string();
337 info->last_update = h.max_time;
338 return 0;
339 }
b3b6e05e
TL
340 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
341 auto r = fifos[index].trim(dpp, marker, false, null_yield);
f67539c2 342 if (r < 0) {
b3b6e05e 343 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
344 << ": unable to trim FIFO: " << get_oid(index)
345 << ": " << cpp_strerror(-r) << dendl;
346 }
347 return r;
348 }
b3b6e05e 349 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
350 librados::AioCompletion* c) override {
351 int r = 0;
352 if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
353 rgw_complete_aio_completion(c, -ENODATA);
354 } else {
b3b6e05e 355 fifos[index].trim(dpp, marker, false, c, null_yield);
f67539c2
TL
356 }
357 return r;
358 }
359 std::string_view max_marker() const override {
360 static const std::string mm =
361 rgw::cls::fifo::marker::max().to_string();
362 return std::string_view(mm);
363 }
b3b6e05e 364 int is_empty(const DoutPrefixProvider *dpp) override {
f67539c2
TL
365 std::vector<rgw::cls::fifo::list_entry> log_entries;
366 bool more = false;
367 for (auto shard = 0u; shard < fifos.size(); ++shard) {
b3b6e05e 368 auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more,
f67539c2
TL
369 null_yield);
370 if (r < 0) {
b3b6e05e 371 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
372 << ": unable to list FIFO: " << get_oid(shard)
373 << ": " << cpp_strerror(-r) << dendl;
374 return r;
375 }
376 if (!log_entries.empty()) {
377 return 0;
378 }
379 }
380 return 1;
381 }
382};
383
384RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
385 : cct(cct),
386 num_shards(cct->_conf->rgw_data_log_num_shards),
387 prefix(get_prefix()),
388 changes(cct->_conf->rgw_data_log_changes_size) {}
389
390bs::error_code DataLogBackends::handle_init(entries_t e) noexcept {
391 std::unique_lock l(m);
392
393 for (const auto& [gen_id, gen] : e) {
394 if (gen.pruned) {
395 lderr(datalog.cct)
396 << __PRETTY_FUNCTION__ << ":" << __LINE__
397 << ": ERROR: given empty generation: gen_id=" << gen_id << dendl;
398 }
399 if (count(gen_id) != 0) {
400 lderr(datalog.cct)
401 << __PRETTY_FUNCTION__ << ":" << __LINE__
402 << ": ERROR: generation already exists: gen_id=" << gen_id << dendl;
403 }
404 try {
405 switch (gen.type) {
406 case log_type::omap:
407 emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards));
408 break;
409 case log_type::fifo:
410 emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards));
411 break;
412 default:
413 lderr(datalog.cct)
414 << __PRETTY_FUNCTION__ << ":" << __LINE__
415 << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
416 << ", type" << gen.type << dendl;
417 return bs::error_code(EFAULT, bs::system_category());
418 }
419 } catch (const bs::system_error& err) {
420 lderr(datalog.cct)
421 << __PRETTY_FUNCTION__ << ":" << __LINE__
422 << ": error setting up backend: gen_id=" << gen_id
423 << ", err=" << err.what() << dendl;
424 return err.code();
425 }
426 }
427 return {};
428}
429bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept {
430 return handle_init(std::move(e));
431}
432bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept {
433 std::unique_lock l(m);
434 auto i = cbegin();
435 if (i->first < new_tail) {
436 return {};
437 }
438 if (new_tail >= (cend() - 1)->first) {
439 lderr(datalog.cct)
440 << __PRETTY_FUNCTION__ << ":" << __LINE__
441 << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl;
442 return bs::error_code(EFAULT, bs::system_category());
443 }
444 erase(i, upper_bound(new_tail));
445 return {};
446}
447
448
b3b6e05e
TL
449int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
450 const RGWZone* _zone,
f67539c2
TL
451 const RGWZoneParams& zoneparams,
452 librados::Rados* lr)
453{
454 zone = _zone;
455 ceph_assert(zone);
456 auto defbacking = to_log_type(
457 cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
458 // Should be guaranteed by `set_enum_allowed`
459 ceph_assert(defbacking);
460 auto log_pool = zoneparams.log_pool;
b3b6e05e 461 auto r = rgw_init_ioctx(dpp, lr, log_pool, ioctx, true, false);
f67539c2 462 if (r < 0) {
b3b6e05e 463 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
464 << ": Failed to initialized ioctx, r=" << r
465 << ", pool=" << log_pool << dendl;
466 return -r;
467 }
468
469 auto besr = logback_generations::init<DataLogBackends>(
b3b6e05e 470 dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
f67539c2
TL
471 return get_oid(gen_id, shard);
472 },
473 num_shards, *defbacking, null_yield, *this);
474
475
476 if (!besr) {
b3b6e05e 477 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
478 << ": Error initializing backends: "
479 << besr.error().message() << dendl;
480 return ceph::from_error_code(besr.error());
481 }
482
483 bes = std::move(*besr);
484 renew_thread = make_named_thread("rgw_dt_lg_renew",
485 &RGWDataChangesLog::renew_run, this);
486 return 0;
487}
488
489int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
490 const auto& name = bs.bucket.name;
491 auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
492 auto r = (ceph_str_hash_linux(name.data(), name.size()) +
493 shard_shift) % num_shards;
494 return static_cast<int>(r);
495}
496
b3b6e05e 497int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
f67539c2
TL
498{
499 if (!zone->log_data)
500 return 0;
501
502 /* we can't keep the bucket name as part of the cls_log_entry, and we need
503 * it later, so we keep two lists under the map */
504 bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
505 RGWDataChangesBE::entries>> m;
506
507 std::unique_lock l(lock);
508 decltype(cur_cycle) entries;
509 entries.swap(cur_cycle);
510 l.unlock();
511
512 auto ut = real_clock::now();
513 auto be = bes->head();
514 for (const auto& bs : entries) {
515 auto index = choose_oid(bs);
516
517 rgw_data_change change;
518 bufferlist bl;
519 change.entity_type = ENTITY_TYPE_BUCKET;
520 change.key = bs.get_key();
521 change.timestamp = ut;
522 encode(change, bl);
523
524 m[index].first.push_back(bs);
525 be->prepare(ut, change.key, std::move(bl), m[index].second);
526 }
527
528 for (auto& [index, p] : m) {
529 auto& [buckets, entries] = p;
530
531 auto now = real_clock::now();
532
b3b6e05e 533 auto ret = be->push(dpp, index, std::move(entries));
f67539c2
TL
534 if (ret < 0) {
535 /* we don't really need to have a special handling for failed cases here,
536 * as this is just an optimization. */
b3b6e05e 537 ldpp_dout(dpp, -1) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
f67539c2
TL
538 return ret;
539 }
540
541 auto expiration = now;
542 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
543 for (auto& bs : buckets) {
544 update_renewed(bs, expiration);
545 }
546 }
547
548 return 0;
549}
550
551void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
552 ChangeStatusPtr& status)
553{
554 ceph_assert(ceph_mutex_is_locked(lock));
555 if (!changes.find(bs, status)) {
556 status = ChangeStatusPtr(new ChangeStatus);
557 changes.add(bs, status);
558 }
559}
560
561void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
562{
563 std::scoped_lock l{lock};
564 cur_cycle.insert(bs);
565}
566
567void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
568 real_time expiration)
569{
570 std::scoped_lock l{lock};
571 ChangeStatusPtr status;
572 _get_change(bs, status);
573
574 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
575 << bs.bucket.name << " shard_id=" << bs.shard_id
576 << " expiration=" << expiration << dendl;
577 status->cur_expiration = expiration;
578}
579
580int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
581 rgw_bucket_shard bs(bucket, shard_id);
582 return choose_oid(bs);
583}
584
b3b6e05e
TL
585bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
586 const rgw_bucket& bucket,
f67539c2
TL
587 optional_yield y) const
588{
589 if (!bucket_filter) {
590 return true;
591 }
592
b3b6e05e 593 return bucket_filter(bucket, y, dpp);
f67539c2
TL
594}
595
596std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
597 return (gen_id > 0 ?
598 fmt::format("{}@G{}.{}", prefix, gen_id, i) :
599 fmt::format("{}.{}", prefix, i));
600}
601
b3b6e05e 602int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
f67539c2
TL
603 auto& bucket = bucket_info.bucket;
604
b3b6e05e 605 if (!filter_bucket(dpp, bucket, null_yield)) {
f67539c2
TL
606 return 0;
607 }
608
609 if (observer) {
610 observer->on_bucket_changed(bucket.get_key());
611 }
612
613 rgw_bucket_shard bs(bucket, shard_id);
614
615 int index = choose_oid(bs);
616 mark_modified(index, bs);
617
618 std::unique_lock l(lock);
619
620 ChangeStatusPtr status;
621 _get_change(bs, status);
622 l.unlock();
623
624 auto now = real_clock::now();
625
626 std::unique_lock sl(status->lock);
627
b3b6e05e 628 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
f67539c2
TL
629 << " shard_id=" << shard_id << " now=" << now
630 << " cur_expiration=" << status->cur_expiration << dendl;
631
632 if (now < status->cur_expiration) {
633 /* no need to send, recently completed */
634 sl.unlock();
635 register_renew(bs);
636 return 0;
637 }
638
639 RefCountedCond* cond;
640
641 if (status->pending) {
642 cond = status->cond;
643
644 ceph_assert(cond);
645
646 status->cond->get();
647 sl.unlock();
648
649 int ret = cond->wait();
650 cond->put();
651 if (!ret) {
652 register_renew(bs);
653 }
654 return ret;
655 }
656
657 status->cond = new RefCountedCond;
658 status->pending = true;
659
660 ceph::real_time expiration;
661
662 int ret;
663
664 do {
665 status->cur_sent = now;
666
667 expiration = now;
668 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
669
670 sl.unlock();
671
672 ceph::buffer::list bl;
673 rgw_data_change change;
674 change.entity_type = ENTITY_TYPE_BUCKET;
675 change.key = bs.get_key();
676 change.timestamp = now;
677 encode(change, bl);
678
b3b6e05e 679 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
f67539c2
TL
680
681 auto be = bes->head();
b3b6e05e 682 ret = be->push(dpp, index, now, change.key, std::move(bl));
f67539c2
TL
683
684 now = real_clock::now();
685
686 sl.lock();
687
688 } while (!ret && real_clock::now() > expiration);
689
690 cond = status->cond;
691
692 status->pending = false;
693 /* time of when operation started, not completed */
694 status->cur_expiration = status->cur_sent;
695 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
696 status->cond = nullptr;
697 sl.unlock();
698
699 cond->done(ret);
700 cond->put();
701
702 return ret;
703}
704
b3b6e05e 705int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2
TL
706 std::vector<rgw_data_change_log_entry>& entries,
707 std::optional<std::string_view> marker,
708 std::string* out_marker, bool* truncated)
709{
710 const auto [start_id, start_cursor] = cursorgeno(marker);
711 auto gen_id = start_id;
712 std::string out_cursor;
713 while (max_entries > 0) {
714 std::vector<rgw_data_change_log_entry> gentries;
715 std::unique_lock l(m);
716 auto i = lower_bound(gen_id);
717 if (i == end()) return 0;
718 auto be = i->second;
719 l.unlock();
720 gen_id = be->gen_id;
b3b6e05e 721 auto r = be->list(dpp, shard, max_entries, gentries,
f67539c2
TL
722 gen_id == start_id ? start_cursor : std::string{},
723 &out_cursor, truncated);
724 if (r < 0)
725 return r;
726
727 if (out_marker && !out_cursor.empty()) {
728 *out_marker = gencursor(gen_id, out_cursor);
729 }
730 for (auto& g : gentries) {
731 g.log_id = gencursor(gen_id, g.log_id);
732 }
733 if (gentries.size() > max_entries)
734 max_entries = 0;
735 else
736 max_entries -= gentries.size();
737
738 std::move(gentries.begin(), gentries.end(),
739 std::back_inserter(entries));
740 ++gen_id;
741 }
742 return 0;
743}
744
b3b6e05e 745int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2
TL
746 std::vector<rgw_data_change_log_entry>& entries,
747 std::optional<std::string_view> marker,
748 std::string* out_marker, bool* truncated)
749{
750 assert(shard < num_shards);
b3b6e05e 751 return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated);
f67539c2
TL
752}
753
b3b6e05e 754int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
755 std::vector<rgw_data_change_log_entry>& entries,
756 LogMarker& marker, bool *ptruncated)
757{
758 bool truncated;
759 entries.clear();
760 for (; marker.shard < num_shards && int(entries.size()) < max_entries;
761 marker.shard++, marker.marker.reset()) {
b3b6e05e 762 int ret = list_entries(dpp, marker.shard, max_entries - entries.size(),
f67539c2
TL
763 entries, marker.marker, NULL, &truncated);
764 if (ret == -ENOENT) {
765 continue;
766 }
767 if (ret < 0) {
768 return ret;
769 }
770 if (truncated) {
771 *ptruncated = true;
772 return 0;
773 }
774 }
775 *ptruncated = (marker.shard < num_shards);
776 return 0;
777}
778
b3b6e05e 779int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info)
f67539c2
TL
780{
781 assert(shard_id < num_shards);
782 auto be = bes->head();
b3b6e05e 783 auto r = be->get_info(dpp, shard_id, info);
f67539c2
TL
784 if (!info->marker.empty()) {
785 info->marker = gencursor(be->gen_id, info->marker);
786 }
787 return r;
788}
789
b3b6e05e 790int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
f67539c2
TL
791{
792 auto [target_gen, cursor] = cursorgen(marker);
793 std::unique_lock l(m);
794 const auto head_gen = (end() - 1)->second->gen_id;
795 const auto tail_gen = begin()->first;
796 if (target_gen < tail_gen) return 0;
797 auto r = 0;
798 for (auto be = lower_bound(0)->second;
799 be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0;
800 be = upper_bound(be->gen_id)->second) {
801 l.unlock();
802 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 803 r = be->trim(dpp, shard_id, c);
f67539c2
TL
804 if (r == -ENOENT)
805 r = -ENODATA;
806 if (r == -ENODATA && be->gen_id < target_gen)
807 r = 0;
808 l.lock();
809 };
810 return r;
811}
812
b3b6e05e 813int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
f67539c2
TL
814{
815 assert(shard_id < num_shards);
b3b6e05e 816 return bes->trim_entries(dpp, shard_id, marker);
f67539c2
TL
817}
818
819class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
820public:
821 DataLogBackends* const bes;
822 const int shard_id;
823 const uint64_t target_gen;
824 const std::string cursor;
825 const uint64_t head_gen;
826 const uint64_t tail_gen;
827 boost::intrusive_ptr<RGWDataChangesBE> be;
828
b3b6e05e 829 GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen,
f67539c2
TL
830 std::string cursor, uint64_t head_gen, uint64_t tail_gen,
831 boost::intrusive_ptr<RGWDataChangesBE> be,
832 lr::AioCompletion* super)
b3b6e05e 833 : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen),
f67539c2
TL
834 cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen),
835 be(std::move(be)) {}
836
b3b6e05e 837 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
838 auto gen_id = be->gen_id;
839 be.reset();
840 if (r == -ENOENT)
841 r = -ENODATA;
842 if (r == -ENODATA && gen_id < target_gen)
843 r = 0;
844 if (r < 0) {
845 complete(std::move(p), r);
846 return;
847 }
848
849 {
850 std::unique_lock l(bes->m);
851 auto i = bes->upper_bound(gen_id);
852 if (i == bes->end() || i->first > target_gen || i->first > head_gen) {
853 l.unlock();
854 complete(std::move(p), -ENODATA);
855 return;
856 }
857 be = i->second;
858 }
859 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 860 be->trim(dpp, shard_id, c, call(std::move(p)));
f67539c2
TL
861 }
862};
863
b3b6e05e 864void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
865 librados::AioCompletion* c)
866{
867 auto [target_gen, cursor] = cursorgen(marker);
868 std::unique_lock l(m);
869 const auto head_gen = (end() - 1)->second->gen_id;
870 const auto tail_gen = begin()->first;
871 if (target_gen < tail_gen) {
872 l.unlock();
873 rgw_complete_aio_completion(c, -ENODATA);
874 return;
875 }
876 auto be = begin()->second;
877 l.unlock();
b3b6e05e 878 auto gt = std::make_unique<GenTrim>(dpp, this, shard_id, target_gen,
f67539c2
TL
879 std::string(cursor), head_gen, tail_gen,
880 be, c);
881
882 auto cc = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 883 be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt)));
f67539c2
TL
884}
885
b3b6e05e 886int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
f67539c2
TL
887 if (size() != 1) {
888 std::vector<mapped_type> candidates;
889 {
890 std::scoped_lock l(m);
891 auto e = cend() - 1;
892 for (auto i = cbegin(); i < e; ++i) {
893 candidates.push_back(i->second);
894 }
895 }
896
897 std::optional<uint64_t> highest;
898 for (auto& be : candidates) {
b3b6e05e 899 auto r = be->is_empty(dpp);
f67539c2
TL
900 if (r < 0) {
901 return r;
902 } else if (r == 1) {
903 highest = be->gen_id;
904 } else {
905 break;
906 }
907 }
908
909 through = highest;
910 if (!highest) {
911 return 0;
912 }
b3b6e05e 913 auto ec = empty_to(dpp, *highest, null_yield);
f67539c2
TL
914 if (ec) {
915 return ceph::from_error_code(ec);
916 }
917 }
918
b3b6e05e 919 return ceph::from_error_code(remove_empty(dpp, null_yield));
f67539c2
TL
920}
921
922
b3b6e05e 923int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
924 librados::AioCompletion* c)
925{
926 assert(shard_id < num_shards);
b3b6e05e 927 bes->trim_entries(dpp, shard_id, marker, c);
f67539c2
TL
928 return 0;
929}
930
931bool RGWDataChangesLog::going_down() const
932{
933 return down_flag;
934}
935
936RGWDataChangesLog::~RGWDataChangesLog() {
937 down_flag = true;
938 if (renew_thread.joinable()) {
939 renew_stop();
940 renew_thread.join();
941 }
942}
943
944void RGWDataChangesLog::renew_run() {
945 static constexpr auto runs_per_prune = 150;
946 auto run = 0;
947 for (;;) {
b3b6e05e
TL
948 const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
949 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
950 int r = renew_entries(&dp);
f67539c2 951 if (r < 0) {
b3b6e05e 952 ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
f67539c2
TL
953 }
954
955 if (going_down())
956 break;
957
958 if (run == runs_per_prune) {
959 std::optional<uint64_t> through;
b3b6e05e
TL
960 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
961 trim_generations(&dp, through);
f67539c2
TL
962 if (r < 0) {
963 derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
964 << r << dendl;
965 } else if (through) {
b3b6e05e 966 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations "
f67539c2
TL
967 << "through " << *through << "." << dendl;
968 } else {
b3b6e05e 969 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune."
f67539c2
TL
970 << dendl;
971 }
972 run = 0;
973 } else {
974 ++run;
975 }
976
977 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
978 std::unique_lock locker{renew_lock};
979 renew_cond.wait_for(locker, std::chrono::seconds(interval));
980 }
981}
982
983void RGWDataChangesLog::renew_stop()
984{
985 std::lock_guard l{renew_lock};
986 renew_cond.notify_all();
987}
988
989void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
990{
991 auto key = bs.get_key();
992 {
993 std::shared_lock rl{modified_lock}; // read lock to check for existence
994 auto shard = modified_shards.find(shard_id);
995 if (shard != modified_shards.end() && shard->second.count(key)) {
996 return;
997 }
998 }
999
1000 std::unique_lock wl{modified_lock}; // write lock for insertion
1001 modified_shards[shard_id].insert(key);
1002}
1003
1004std::string RGWDataChangesLog::max_marker() const {
1005 return gencursor(std::numeric_limits<uint64_t>::max(),
1006 "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
1007}
1008
b3b6e05e
TL
1009int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) {
1010 return ceph::from_error_code(bes->new_backing(dpp, type, y));
f67539c2
TL
1011}
1012
b3b6e05e
TL
1013int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
1014 return bes->trim_generations(dpp, through);
f67539c2 1015}