]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_datalog.cc
import ceph 16.2.6
[ceph.git] / ceph / src / rgw / rgw_datalog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <vector>
5
6 #include "common/debug.h"
7 #include "common/containers.h"
8 #include "common/errno.h"
9 #include "common/error_code.h"
10
11 #include "common/async/blocked_completion.h"
12 #include "common/async/librados_completion.h"
13
14 #include "cls/fifo/cls_fifo_types.h"
15 #include "cls/log/cls_log_client.h"
16
17 #include "cls_fifo_legacy.h"
18 #include "rgw_datalog.h"
19 #include "rgw_log_backing.h"
20 #include "rgw_tools.h"
21
22 #define dout_context g_ceph_context
23 static constexpr auto dout_subsys = ceph_subsys_rgw;
24
25 namespace bs = boost::system;
26 namespace lr = librados;
27
28 using ceph::containers::tiny_vector;
29
30 void rgw_data_change::dump(ceph::Formatter *f) const
31 {
32 std::string type;
33 switch (entity_type) {
34 case ENTITY_TYPE_BUCKET:
35 type = "bucket";
36 break;
37 default:
38 type = "unknown";
39 }
40 encode_json("entity_type", type, f);
41 encode_json("key", key, f);
42 utime_t ut(timestamp);
43 encode_json("timestamp", ut, f);
44 }
45
46 void rgw_data_change::decode_json(JSONObj *obj) {
47 std::string s;
48 JSONDecoder::decode_json("entity_type", s, obj);
49 if (s == "bucket") {
50 entity_type = ENTITY_TYPE_BUCKET;
51 } else {
52 entity_type = ENTITY_TYPE_UNKNOWN;
53 }
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58 }
59
60 void rgw_data_change_log_entry::dump(Formatter *f) const
61 {
62 encode_json("log_id", log_id, f);
63 utime_t ut(log_timestamp);
64 encode_json("log_timestamp", ut, f);
65 encode_json("entry", entry, f);
66 }
67
68 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
69 JSONDecoder::decode_json("log_id", log_id, obj);
70 utime_t ut;
71 JSONDecoder::decode_json("log_timestamp", ut, obj);
72 log_timestamp = ut.to_real_time();
73 JSONDecoder::decode_json("entry", entry, obj);
74 }
75
76 class RGWDataChangesOmap final : public RGWDataChangesBE {
77 using centries = std::list<cls_log_entry>;
78 std::vector<std::string> oids;
79
80 public:
81 RGWDataChangesOmap(lr::IoCtx& ioctx,
82 RGWDataChangesLog& datalog,
83 uint64_t gen_id,
84 int num_shards)
85 : RGWDataChangesBE(ioctx, datalog, gen_id) {
86 oids.reserve(num_shards);
87 for (auto i = 0; i < num_shards; ++i) {
88 oids.push_back(get_oid(i));
89 }
90 }
91 ~RGWDataChangesOmap() override = default;
92
93 void prepare(ceph::real_time ut, const std::string& key,
94 ceph::buffer::list&& entry, entries& out) override {
95 if (!std::holds_alternative<centries>(out)) {
96 ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out));
97 out = centries();
98 }
99
100 cls_log_entry e;
101 cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
102 std::get<centries>(out).push_back(std::move(e));
103 }
104 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
105 lr::ObjectWriteOperation op;
106 cls_log_add(op, std::get<centries>(items), true);
107 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
108 if (r < 0) {
109 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
110 << ": failed to push to " << oids[index] << cpp_strerror(-r)
111 << dendl;
112 }
113 return r;
114 }
115 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
116 const std::string& key,
117 ceph::buffer::list&& bl) override {
118 lr::ObjectWriteOperation op;
119 cls_log_add(op, utime_t(now), {}, key, bl);
120 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
121 if (r < 0) {
122 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
123 << ": failed to push to " << oids[index]
124 << cpp_strerror(-r) << dendl;
125 }
126 return r;
127 }
128 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
129 std::vector<rgw_data_change_log_entry>& entries,
130 std::optional<std::string_view> marker,
131 std::string* out_marker, bool* truncated) override {
132 std::list<cls_log_entry> log_entries;
133 lr::ObjectReadOperation op;
134 cls_log_list(op, {}, {}, std::string(marker.value_or("")),
135 max_entries, log_entries, out_marker, truncated);
136 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
137 if (r == -ENOENT) {
138 *truncated = false;
139 return 0;
140 }
141 if (r < 0) {
142 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
143 << ": failed to list " << oids[index]
144 << cpp_strerror(-r) << dendl;
145 return r;
146 }
147 for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
148 rgw_data_change_log_entry log_entry;
149 log_entry.log_id = iter->id;
150 auto rt = iter->timestamp.to_real_time();
151 log_entry.log_timestamp = rt;
152 auto liter = iter->data.cbegin();
153 try {
154 decode(log_entry.entry, liter);
155 } catch (ceph::buffer::error& err) {
156 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
157 << ": failed to decode data changes log entry: "
158 << err.what() << dendl;
159 return -EIO;
160 }
161 entries.push_back(log_entry);
162 }
163 return 0;
164 }
165 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
166 cls_log_header header;
167 lr::ObjectReadOperation op;
168 cls_log_info(op, &header);
169 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
170 if (r == -ENOENT) r = 0;
171 if (r < 0) {
172 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
173 << ": failed to get info from " << oids[index]
174 << cpp_strerror(-r) << dendl;
175 } else {
176 info->marker = header.max_marker;
177 info->last_update = header.max_time.to_real_time();
178 }
179 return r;
180 }
181 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
182 lr::ObjectWriteOperation op;
183 cls_log_trim(op, {}, {}, {}, std::string(marker));
184 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
185 if (r == -ENOENT) r = -ENODATA;
186 if (r < 0 && r != -ENODATA) {
187 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
188 << ": failed to get info from " << oids[index]
189 << cpp_strerror(-r) << dendl;
190 }
191 return r;
192 }
193 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
194 lr::AioCompletion* c) override {
195 lr::ObjectWriteOperation op;
196 cls_log_trim(op, {}, {}, {}, std::string(marker));
197 auto r = ioctx.aio_operate(oids[index], c, &op, 0);
198 if (r == -ENOENT) r = -ENODATA;
199 if (r < 0) {
200 lderr(cct) << __PRETTY_FUNCTION__
201 << ": failed to get info from " << oids[index]
202 << cpp_strerror(-r) << dendl;
203 }
204 return r;
205 }
206 std::string_view max_marker() const override {
207 return "99999999"sv;
208 }
209 int is_empty(const DoutPrefixProvider *dpp) override {
210 for (auto shard = 0u; shard < oids.size(); ++shard) {
211 std::list<cls_log_entry> log_entries;
212 lr::ObjectReadOperation op;
213 std::string out_marker;
214 bool truncated;
215 cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated);
216 auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield);
217 if (r == -ENOENT) {
218 continue;
219 }
220 if (r < 0) {
221 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
222 << ": failed to list " << oids[shard]
223 << cpp_strerror(-r) << dendl;
224 return r;
225 }
226 if (!log_entries.empty()) {
227 return 0;
228 }
229 }
230 return 1;
231 }
232 };
233
234 class RGWDataChangesFIFO final : public RGWDataChangesBE {
235 using centries = std::vector<ceph::buffer::list>;
236 tiny_vector<LazyFIFO> fifos;
237
238 public:
239 RGWDataChangesFIFO(lr::IoCtx& ioctx,
240 RGWDataChangesLog& datalog,
241 uint64_t gen_id, int shards)
242 : RGWDataChangesBE(ioctx, datalog, gen_id),
243 fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) {
244 emplacer.emplace(ioctx, get_oid(i));
245 }) {}
246 ~RGWDataChangesFIFO() override = default;
247 void prepare(ceph::real_time, const std::string&,
248 ceph::buffer::list&& entry, entries& out) override {
249 if (!std::holds_alternative<centries>(out)) {
250 ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out));
251 out = centries();
252 }
253 std::get<centries>(out).push_back(std::move(entry));
254 }
255 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
256 auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield);
257 if (r < 0) {
258 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
259 << ": unable to push to FIFO: " << get_oid(index)
260 << ": " << cpp_strerror(-r) << dendl;
261 }
262 return r;
263 }
264 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time,
265 const std::string&,
266 ceph::buffer::list&& bl) override {
267 auto r = fifos[index].push(dpp, std::move(bl), null_yield);
268 if (r < 0) {
269 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
270 << ": unable to push to FIFO: " << get_oid(index)
271 << ": " << cpp_strerror(-r) << dendl;
272 }
273 return r;
274 }
275 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
276 std::vector<rgw_data_change_log_entry>& entries,
277 std::optional<std::string_view> marker,
278 std::string* out_marker, bool* truncated) override {
279 std::vector<rgw::cls::fifo::list_entry> log_entries;
280 bool more = false;
281 auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more,
282 null_yield);
283 if (r < 0) {
284 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
285 << ": unable to list FIFO: " << get_oid(index)
286 << ": " << cpp_strerror(-r) << dendl;
287 return r;
288 }
289 for (const auto& entry : log_entries) {
290 rgw_data_change_log_entry log_entry;
291 log_entry.log_id = entry.marker;
292 log_entry.log_timestamp = entry.mtime;
293 auto liter = entry.data.cbegin();
294 try {
295 decode(log_entry.entry, liter);
296 } catch (const buffer::error& err) {
297 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
298 << ": failed to decode data changes log entry: "
299 << err.what() << dendl;
300 return -EIO;
301 }
302 entries.push_back(std::move(log_entry));
303 }
304 if (truncated)
305 *truncated = more;
306 if (out_marker && !log_entries.empty()) {
307 *out_marker = log_entries.back().marker;
308 }
309 return 0;
310 }
311 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
312 auto& fifo = fifos[index];
313 auto r = fifo.read_meta(dpp, null_yield);
314 if (r < 0) {
315 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
316 << ": unable to get FIFO metadata: " << get_oid(index)
317 << ": " << cpp_strerror(-r) << dendl;
318 return r;
319 }
320 rados::cls::fifo::info m;
321 fifo.meta(dpp, m, null_yield);
322 auto p = m.head_part_num;
323 if (p < 0) {
324 info->marker = ""s;
325 info->last_update = ceph::real_clock::zero();
326 return 0;
327 }
328 rgw::cls::fifo::part_info h;
329 r = fifo.get_part_info(dpp, p, &h, null_yield);
330 if (r < 0) {
331 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
332 << ": unable to get part info: " << get_oid(index) << "/" << p
333 << ": " << cpp_strerror(-r) << dendl;
334 return r;
335 }
336 info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string();
337 info->last_update = h.max_time;
338 return 0;
339 }
340 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
341 auto r = fifos[index].trim(dpp, marker, false, null_yield);
342 if (r < 0) {
343 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
344 << ": unable to trim FIFO: " << get_oid(index)
345 << ": " << cpp_strerror(-r) << dendl;
346 }
347 return r;
348 }
349 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
350 librados::AioCompletion* c) override {
351 int r = 0;
352 if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
353 rgw_complete_aio_completion(c, -ENODATA);
354 } else {
355 fifos[index].trim(dpp, marker, false, c, null_yield);
356 }
357 return r;
358 }
359 std::string_view max_marker() const override {
360 static const std::string mm =
361 rgw::cls::fifo::marker::max().to_string();
362 return std::string_view(mm);
363 }
364 int is_empty(const DoutPrefixProvider *dpp) override {
365 std::vector<rgw::cls::fifo::list_entry> log_entries;
366 bool more = false;
367 for (auto shard = 0u; shard < fifos.size(); ++shard) {
368 auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more,
369 null_yield);
370 if (r < 0) {
371 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
372 << ": unable to list FIFO: " << get_oid(shard)
373 << ": " << cpp_strerror(-r) << dendl;
374 return r;
375 }
376 if (!log_entries.empty()) {
377 return 0;
378 }
379 }
380 return 1;
381 }
382 };
383
384 RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
385 : cct(cct),
386 num_shards(cct->_conf->rgw_data_log_num_shards),
387 prefix(get_prefix()),
388 changes(cct->_conf->rgw_data_log_changes_size) {}
389
390 bs::error_code DataLogBackends::handle_init(entries_t e) noexcept {
391 std::unique_lock l(m);
392
393 for (const auto& [gen_id, gen] : e) {
394 if (gen.pruned) {
395 lderr(datalog.cct)
396 << __PRETTY_FUNCTION__ << ":" << __LINE__
397 << ": ERROR: given empty generation: gen_id=" << gen_id << dendl;
398 }
399 if (count(gen_id) != 0) {
400 lderr(datalog.cct)
401 << __PRETTY_FUNCTION__ << ":" << __LINE__
402 << ": ERROR: generation already exists: gen_id=" << gen_id << dendl;
403 }
404 try {
405 switch (gen.type) {
406 case log_type::omap:
407 emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards));
408 break;
409 case log_type::fifo:
410 emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards));
411 break;
412 default:
413 lderr(datalog.cct)
414 << __PRETTY_FUNCTION__ << ":" << __LINE__
415 << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
416 << ", type" << gen.type << dendl;
417 return bs::error_code(EFAULT, bs::system_category());
418 }
419 } catch (const bs::system_error& err) {
420 lderr(datalog.cct)
421 << __PRETTY_FUNCTION__ << ":" << __LINE__
422 << ": error setting up backend: gen_id=" << gen_id
423 << ", err=" << err.what() << dendl;
424 return err.code();
425 }
426 }
427 return {};
428 }
429 bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept {
430 return handle_init(std::move(e));
431 }
432 bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept {
433 std::unique_lock l(m);
434 auto i = cbegin();
435 if (i->first < new_tail) {
436 return {};
437 }
438 if (new_tail >= (cend() - 1)->first) {
439 lderr(datalog.cct)
440 << __PRETTY_FUNCTION__ << ":" << __LINE__
441 << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl;
442 return bs::error_code(EFAULT, bs::system_category());
443 }
444 erase(i, upper_bound(new_tail));
445 return {};
446 }
447
448
449 int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
450 const RGWZone* _zone,
451 const RGWZoneParams& zoneparams,
452 librados::Rados* lr)
453 {
454 zone = _zone;
455 ceph_assert(zone);
456 auto defbacking = to_log_type(
457 cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
458 // Should be guaranteed by `set_enum_allowed`
459 ceph_assert(defbacking);
460 auto log_pool = zoneparams.log_pool;
461 auto r = rgw_init_ioctx(dpp, lr, log_pool, ioctx, true, false);
462 if (r < 0) {
463 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
464 << ": Failed to initialized ioctx, r=" << r
465 << ", pool=" << log_pool << dendl;
466 return -r;
467 }
468
469 auto besr = logback_generations::init<DataLogBackends>(
470 dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
471 return get_oid(gen_id, shard);
472 },
473 num_shards, *defbacking, null_yield, *this);
474
475
476 if (!besr) {
477 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
478 << ": Error initializing backends: "
479 << besr.error().message() << dendl;
480 return ceph::from_error_code(besr.error());
481 }
482
483 bes = std::move(*besr);
484 renew_thread = make_named_thread("rgw_dt_lg_renew",
485 &RGWDataChangesLog::renew_run, this);
486 return 0;
487 }
488
489 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
490 const auto& name = bs.bucket.name;
491 auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
492 auto r = (ceph_str_hash_linux(name.data(), name.size()) +
493 shard_shift) % num_shards;
494 return static_cast<int>(r);
495 }
496
497 int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
498 {
499 if (!zone->log_data)
500 return 0;
501
502 /* we can't keep the bucket name as part of the cls_log_entry, and we need
503 * it later, so we keep two lists under the map */
504 bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
505 RGWDataChangesBE::entries>> m;
506
507 std::unique_lock l(lock);
508 decltype(cur_cycle) entries;
509 entries.swap(cur_cycle);
510 l.unlock();
511
512 auto ut = real_clock::now();
513 auto be = bes->head();
514 for (const auto& bs : entries) {
515 auto index = choose_oid(bs);
516
517 rgw_data_change change;
518 bufferlist bl;
519 change.entity_type = ENTITY_TYPE_BUCKET;
520 change.key = bs.get_key();
521 change.timestamp = ut;
522 encode(change, bl);
523
524 m[index].first.push_back(bs);
525 be->prepare(ut, change.key, std::move(bl), m[index].second);
526 }
527
528 for (auto& [index, p] : m) {
529 auto& [buckets, entries] = p;
530
531 auto now = real_clock::now();
532
533 auto ret = be->push(dpp, index, std::move(entries));
534 if (ret < 0) {
535 /* we don't really need to have a special handling for failed cases here,
536 * as this is just an optimization. */
537 ldpp_dout(dpp, -1) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
538 return ret;
539 }
540
541 auto expiration = now;
542 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
543 for (auto& bs : buckets) {
544 update_renewed(bs, expiration);
545 }
546 }
547
548 return 0;
549 }
550
551 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
552 ChangeStatusPtr& status)
553 {
554 ceph_assert(ceph_mutex_is_locked(lock));
555 if (!changes.find(bs, status)) {
556 status = ChangeStatusPtr(new ChangeStatus);
557 changes.add(bs, status);
558 }
559 }
560
561 void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
562 {
563 std::scoped_lock l{lock};
564 cur_cycle.insert(bs);
565 }
566
567 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
568 real_time expiration)
569 {
570 std::scoped_lock l{lock};
571 ChangeStatusPtr status;
572 _get_change(bs, status);
573
574 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
575 << bs.bucket.name << " shard_id=" << bs.shard_id
576 << " expiration=" << expiration << dendl;
577 status->cur_expiration = expiration;
578 }
579
580 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
581 rgw_bucket_shard bs(bucket, shard_id);
582 return choose_oid(bs);
583 }
584
585 bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
586 const rgw_bucket& bucket,
587 optional_yield y) const
588 {
589 if (!bucket_filter) {
590 return true;
591 }
592
593 return bucket_filter(bucket, y, dpp);
594 }
595
596 std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
597 return (gen_id > 0 ?
598 fmt::format("{}@G{}.{}", prefix, gen_id, i) :
599 fmt::format("{}.{}", prefix, i));
600 }
601
602 int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
603 auto& bucket = bucket_info.bucket;
604
605 if (!filter_bucket(dpp, bucket, null_yield)) {
606 return 0;
607 }
608
609 if (observer) {
610 observer->on_bucket_changed(bucket.get_key());
611 }
612
613 rgw_bucket_shard bs(bucket, shard_id);
614
615 int index = choose_oid(bs);
616 mark_modified(index, bs);
617
618 std::unique_lock l(lock);
619
620 ChangeStatusPtr status;
621 _get_change(bs, status);
622 l.unlock();
623
624 auto now = real_clock::now();
625
626 std::unique_lock sl(status->lock);
627
628 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
629 << " shard_id=" << shard_id << " now=" << now
630 << " cur_expiration=" << status->cur_expiration << dendl;
631
632 if (now < status->cur_expiration) {
633 /* no need to send, recently completed */
634 sl.unlock();
635 register_renew(bs);
636 return 0;
637 }
638
639 RefCountedCond* cond;
640
641 if (status->pending) {
642 cond = status->cond;
643
644 ceph_assert(cond);
645
646 status->cond->get();
647 sl.unlock();
648
649 int ret = cond->wait();
650 cond->put();
651 if (!ret) {
652 register_renew(bs);
653 }
654 return ret;
655 }
656
657 status->cond = new RefCountedCond;
658 status->pending = true;
659
660 ceph::real_time expiration;
661
662 int ret;
663
664 do {
665 status->cur_sent = now;
666
667 expiration = now;
668 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
669
670 sl.unlock();
671
672 ceph::buffer::list bl;
673 rgw_data_change change;
674 change.entity_type = ENTITY_TYPE_BUCKET;
675 change.key = bs.get_key();
676 change.timestamp = now;
677 encode(change, bl);
678
679 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
680
681 auto be = bes->head();
682 ret = be->push(dpp, index, now, change.key, std::move(bl));
683
684 now = real_clock::now();
685
686 sl.lock();
687
688 } while (!ret && real_clock::now() > expiration);
689
690 cond = status->cond;
691
692 status->pending = false;
693 /* time of when operation started, not completed */
694 status->cur_expiration = status->cur_sent;
695 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
696 status->cond = nullptr;
697 sl.unlock();
698
699 cond->done(ret);
700 cond->put();
701
702 return ret;
703 }
704
705 int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries,
706 std::vector<rgw_data_change_log_entry>& entries,
707 std::string_view marker,
708 std::string* out_marker,
709 bool* truncated)
710 {
711 const auto [start_id, start_cursor] = cursorgen(marker);
712 auto gen_id = start_id;
713 std::string out_cursor;
714 while (max_entries > 0) {
715 std::vector<rgw_data_change_log_entry> gentries;
716 std::unique_lock l(m);
717 auto i = lower_bound(gen_id);
718 if (i == end()) return 0;
719 auto be = i->second;
720 l.unlock();
721 gen_id = be->gen_id;
722 auto r = be->list(dpp, shard, max_entries, gentries,
723 gen_id == start_id ? start_cursor : std::string{},
724 &out_cursor, truncated);
725 if (r < 0)
726 return r;
727
728 if (out_marker && !out_cursor.empty()) {
729 *out_marker = gencursor(gen_id, out_cursor);
730 }
731 for (auto& g : gentries) {
732 g.log_id = gencursor(gen_id, g.log_id);
733 }
734 if (int s = gentries.size(); s < 0 || s > max_entries)
735 max_entries = 0;
736 else
737 max_entries -= gentries.size();
738
739 std::move(gentries.begin(), gentries.end(),
740 std::back_inserter(entries));
741 ++gen_id;
742 }
743 return 0;
744 }
745
746 int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
747 std::vector<rgw_data_change_log_entry>& entries,
748 std::string_view marker,
749 std::string* out_marker, bool* truncated)
750 {
751 assert(shard < num_shards);
752 return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated);
753 }
754
755 int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries,
756 std::vector<rgw_data_change_log_entry>& entries,
757 LogMarker& marker, bool *ptruncated)
758 {
759 bool truncated;
760 entries.clear();
761 for (; marker.shard < num_shards && int(entries.size()) < max_entries;
762 marker.shard++, marker.marker.clear()) {
763 int ret = list_entries(dpp, marker.shard, max_entries - entries.size(),
764 entries, marker.marker, NULL, &truncated);
765 if (ret == -ENOENT) {
766 continue;
767 }
768 if (ret < 0) {
769 return ret;
770 }
771 if (truncated) {
772 *ptruncated = true;
773 return 0;
774 }
775 }
776 *ptruncated = (marker.shard < num_shards);
777 return 0;
778 }
779
780 int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info)
781 {
782 assert(shard_id < num_shards);
783 auto be = bes->head();
784 auto r = be->get_info(dpp, shard_id, info);
785 if (!info->marker.empty()) {
786 info->marker = gencursor(be->gen_id, info->marker);
787 }
788 return r;
789 }
790
791 int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
792 {
793 auto [target_gen, cursor] = cursorgen(marker);
794 std::unique_lock l(m);
795 const auto head_gen = (end() - 1)->second->gen_id;
796 const auto tail_gen = begin()->first;
797 if (target_gen < tail_gen) return 0;
798 auto r = 0;
799 for (auto be = lower_bound(0)->second;
800 be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0;
801 be = upper_bound(be->gen_id)->second) {
802 l.unlock();
803 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
804 r = be->trim(dpp, shard_id, c);
805 if (r == -ENOENT)
806 r = -ENODATA;
807 if (r == -ENODATA && be->gen_id < target_gen)
808 r = 0;
809 if (be->gen_id == target_gen)
810 break;
811 l.lock();
812 };
813 return r;
814 }
815
816 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
817 {
818 assert(shard_id < num_shards);
819 return bes->trim_entries(dpp, shard_id, marker);
820 }
821
822 class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
823 public:
824 DataLogBackends* const bes;
825 const int shard_id;
826 const uint64_t target_gen;
827 const std::string cursor;
828 const uint64_t head_gen;
829 const uint64_t tail_gen;
830 boost::intrusive_ptr<RGWDataChangesBE> be;
831
832 GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen,
833 std::string cursor, uint64_t head_gen, uint64_t tail_gen,
834 boost::intrusive_ptr<RGWDataChangesBE> be,
835 lr::AioCompletion* super)
836 : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen),
837 cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen),
838 be(std::move(be)) {}
839
840 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
841 auto gen_id = be->gen_id;
842 be.reset();
843 if (r == -ENOENT)
844 r = -ENODATA;
845 if (r == -ENODATA && gen_id < target_gen)
846 r = 0;
847 if (r < 0) {
848 complete(std::move(p), r);
849 return;
850 }
851
852 {
853 std::unique_lock l(bes->m);
854 auto i = bes->upper_bound(gen_id);
855 if (i == bes->end() || i->first > target_gen || i->first > head_gen) {
856 l.unlock();
857 complete(std::move(p), -ENODATA);
858 return;
859 }
860 be = i->second;
861 }
862 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
863 be->trim(dpp, shard_id, c, call(std::move(p)));
864 }
865 };
866
867 void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
868 librados::AioCompletion* c)
869 {
870 auto [target_gen, cursor] = cursorgen(marker);
871 std::unique_lock l(m);
872 const auto head_gen = (end() - 1)->second->gen_id;
873 const auto tail_gen = begin()->first;
874 if (target_gen < tail_gen) {
875 l.unlock();
876 rgw_complete_aio_completion(c, -ENODATA);
877 return;
878 }
879 auto be = begin()->second;
880 l.unlock();
881 auto gt = std::make_unique<GenTrim>(dpp, this, shard_id, target_gen,
882 std::string(cursor), head_gen, tail_gen,
883 be, c);
884
885 auto cc = be->gen_id == target_gen ? cursor : be->max_marker();
886 be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt)));
887 }
888
889 int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
890 if (size() != 1) {
891 std::vector<mapped_type> candidates;
892 {
893 std::scoped_lock l(m);
894 auto e = cend() - 1;
895 for (auto i = cbegin(); i < e; ++i) {
896 candidates.push_back(i->second);
897 }
898 }
899
900 std::optional<uint64_t> highest;
901 for (auto& be : candidates) {
902 auto r = be->is_empty(dpp);
903 if (r < 0) {
904 return r;
905 } else if (r == 1) {
906 highest = be->gen_id;
907 } else {
908 break;
909 }
910 }
911
912 through = highest;
913 if (!highest) {
914 return 0;
915 }
916 auto ec = empty_to(dpp, *highest, null_yield);
917 if (ec) {
918 return ceph::from_error_code(ec);
919 }
920 }
921
922 return ceph::from_error_code(remove_empty(dpp, null_yield));
923 }
924
925
926 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
927 librados::AioCompletion* c)
928 {
929 assert(shard_id < num_shards);
930 bes->trim_entries(dpp, shard_id, marker, c);
931 return 0;
932 }
933
934 bool RGWDataChangesLog::going_down() const
935 {
936 return down_flag;
937 }
938
939 RGWDataChangesLog::~RGWDataChangesLog() {
940 down_flag = true;
941 if (renew_thread.joinable()) {
942 renew_stop();
943 renew_thread.join();
944 }
945 }
946
947 void RGWDataChangesLog::renew_run() {
948 static constexpr auto runs_per_prune = 150;
949 auto run = 0;
950 for (;;) {
951 const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
952 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
953 int r = renew_entries(&dp);
954 if (r < 0) {
955 ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
956 }
957
958 if (going_down())
959 break;
960
961 if (run == runs_per_prune) {
962 std::optional<uint64_t> through;
963 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
964 trim_generations(&dp, through);
965 if (r < 0) {
966 derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
967 << r << dendl;
968 } else if (through) {
969 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations "
970 << "through " << *through << "." << dendl;
971 } else {
972 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune."
973 << dendl;
974 }
975 run = 0;
976 } else {
977 ++run;
978 }
979
980 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
981 std::unique_lock locker{renew_lock};
982 renew_cond.wait_for(locker, std::chrono::seconds(interval));
983 }
984 }
985
986 void RGWDataChangesLog::renew_stop()
987 {
988 std::lock_guard l{renew_lock};
989 renew_cond.notify_all();
990 }
991
992 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
993 {
994 auto key = bs.get_key();
995 {
996 std::shared_lock rl{modified_lock}; // read lock to check for existence
997 auto shard = modified_shards.find(shard_id);
998 if (shard != modified_shards.end() && shard->second.count(key)) {
999 return;
1000 }
1001 }
1002
1003 std::unique_lock wl{modified_lock}; // write lock for insertion
1004 modified_shards[shard_id].insert(key);
1005 }
1006
1007 std::string RGWDataChangesLog::max_marker() const {
1008 return gencursor(std::numeric_limits<uint64_t>::max(),
1009 "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
1010 }
1011
1012 int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) {
1013 return ceph::from_error_code(bes->new_backing(dpp, type, y));
1014 }
1015
1016 int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
1017 return bes->trim_generations(dpp, through);
1018 }