]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_datalog.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / rgw / rgw_datalog.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include <vector>
5
6#include "common/debug.h"
7#include "common/containers.h"
8#include "common/errno.h"
9#include "common/error_code.h"
10
11#include "common/async/blocked_completion.h"
12#include "common/async/librados_completion.h"
13
14#include "cls/fifo/cls_fifo_types.h"
15#include "cls/log/cls_log_client.h"
16
17#include "cls_fifo_legacy.h"
18#include "rgw_datalog.h"
19#include "rgw_log_backing.h"
20#include "rgw_tools.h"
21
22#define dout_context g_ceph_context
23static constexpr auto dout_subsys = ceph_subsys_rgw;
24
25namespace bs = boost::system;
26namespace lr = librados;
27
28using ceph::containers::tiny_vector;
29
30void rgw_data_change::dump(ceph::Formatter *f) const
31{
32 std::string type;
33 switch (entity_type) {
34 case ENTITY_TYPE_BUCKET:
35 type = "bucket";
36 break;
37 default:
38 type = "unknown";
39 }
40 encode_json("entity_type", type, f);
41 encode_json("key", key, f);
42 utime_t ut(timestamp);
43 encode_json("timestamp", ut, f);
44}
45
46void rgw_data_change::decode_json(JSONObj *obj) {
47 std::string s;
48 JSONDecoder::decode_json("entity_type", s, obj);
49 if (s == "bucket") {
50 entity_type = ENTITY_TYPE_BUCKET;
51 } else {
52 entity_type = ENTITY_TYPE_UNKNOWN;
53 }
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58}
59
60void rgw_data_change_log_entry::dump(Formatter *f) const
61{
62 encode_json("log_id", log_id, f);
63 utime_t ut(log_timestamp);
64 encode_json("log_timestamp", ut, f);
65 encode_json("entry", entry, f);
66}
67
68void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
69 JSONDecoder::decode_json("log_id", log_id, obj);
70 utime_t ut;
71 JSONDecoder::decode_json("log_timestamp", ut, obj);
72 log_timestamp = ut.to_real_time();
73 JSONDecoder::decode_json("entry", entry, obj);
74}
75
76class RGWDataChangesOmap final : public RGWDataChangesBE {
77 using centries = std::list<cls_log_entry>;
78 std::vector<std::string> oids;
79
80public:
81 RGWDataChangesOmap(lr::IoCtx& ioctx,
82 RGWDataChangesLog& datalog,
83 uint64_t gen_id,
84 int num_shards)
85 : RGWDataChangesBE(ioctx, datalog, gen_id) {
86 oids.reserve(num_shards);
87 for (auto i = 0; i < num_shards; ++i) {
88 oids.push_back(get_oid(i));
89 }
90 }
91 ~RGWDataChangesOmap() override = default;
b3b6e05e 92
f67539c2
TL
93 void prepare(ceph::real_time ut, const std::string& key,
94 ceph::buffer::list&& entry, entries& out) override {
95 if (!std::holds_alternative<centries>(out)) {
96 ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out));
97 out = centries();
98 }
99
100 cls_log_entry e;
101 cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
102 std::get<centries>(out).push_back(std::move(e));
103 }
b3b6e05e 104 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
f67539c2
TL
105 lr::ObjectWriteOperation op;
106 cls_log_add(op, std::get<centries>(items), true);
b3b6e05e 107 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2 108 if (r < 0) {
b3b6e05e 109 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
110 << ": failed to push to " << oids[index] << cpp_strerror(-r)
111 << dendl;
112 }
113 return r;
114 }
b3b6e05e 115 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
f67539c2
TL
116 const std::string& key,
117 ceph::buffer::list&& bl) override {
118 lr::ObjectWriteOperation op;
119 cls_log_add(op, utime_t(now), {}, key, bl);
b3b6e05e 120 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2 121 if (r < 0) {
b3b6e05e 122 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
123 << ": failed to push to " << oids[index]
124 << cpp_strerror(-r) << dendl;
125 }
126 return r;
127 }
b3b6e05e 128 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
f67539c2
TL
129 std::vector<rgw_data_change_log_entry>& entries,
130 std::optional<std::string_view> marker,
131 std::string* out_marker, bool* truncated) override {
132 std::list<cls_log_entry> log_entries;
133 lr::ObjectReadOperation op;
134 cls_log_list(op, {}, {}, std::string(marker.value_or("")),
135 max_entries, log_entries, out_marker, truncated);
b3b6e05e 136 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
f67539c2
TL
137 if (r == -ENOENT) {
138 *truncated = false;
139 return 0;
140 }
141 if (r < 0) {
b3b6e05e 142 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
143 << ": failed to list " << oids[index]
144 << cpp_strerror(-r) << dendl;
145 return r;
146 }
147 for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
148 rgw_data_change_log_entry log_entry;
149 log_entry.log_id = iter->id;
150 auto rt = iter->timestamp.to_real_time();
151 log_entry.log_timestamp = rt;
152 auto liter = iter->data.cbegin();
153 try {
154 decode(log_entry.entry, liter);
155 } catch (ceph::buffer::error& err) {
b3b6e05e 156 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
157 << ": failed to decode data changes log entry: "
158 << err.what() << dendl;
159 return -EIO;
160 }
161 entries.push_back(log_entry);
162 }
163 return 0;
164 }
b3b6e05e 165 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
f67539c2
TL
166 cls_log_header header;
167 lr::ObjectReadOperation op;
168 cls_log_info(op, &header);
b3b6e05e 169 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
f67539c2
TL
170 if (r == -ENOENT) r = 0;
171 if (r < 0) {
b3b6e05e 172 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
173 << ": failed to get info from " << oids[index]
174 << cpp_strerror(-r) << dendl;
175 } else {
176 info->marker = header.max_marker;
177 info->last_update = header.max_time.to_real_time();
178 }
179 return r;
180 }
b3b6e05e 181 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
f67539c2
TL
182 lr::ObjectWriteOperation op;
183 cls_log_trim(op, {}, {}, {}, std::string(marker));
b3b6e05e 184 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2
TL
185 if (r == -ENOENT) r = -ENODATA;
186 if (r < 0 && r != -ENODATA) {
b3b6e05e 187 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
188 << ": failed to get info from " << oids[index]
189 << cpp_strerror(-r) << dendl;
190 }
191 return r;
192 }
b3b6e05e 193 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
194 lr::AioCompletion* c) override {
195 lr::ObjectWriteOperation op;
196 cls_log_trim(op, {}, {}, {}, std::string(marker));
197 auto r = ioctx.aio_operate(oids[index], c, &op, 0);
198 if (r == -ENOENT) r = -ENODATA;
199 if (r < 0) {
20effc67 200 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
201 << ": failed to get info from " << oids[index]
202 << cpp_strerror(-r) << dendl;
203 }
204 return r;
205 }
206 std::string_view max_marker() const override {
20effc67 207 return "99999999";
f67539c2 208 }
b3b6e05e 209 int is_empty(const DoutPrefixProvider *dpp) override {
f67539c2
TL
210 for (auto shard = 0u; shard < oids.size(); ++shard) {
211 std::list<cls_log_entry> log_entries;
212 lr::ObjectReadOperation op;
213 std::string out_marker;
214 bool truncated;
215 cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated);
b3b6e05e 216 auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield);
f67539c2
TL
217 if (r == -ENOENT) {
218 continue;
219 }
220 if (r < 0) {
b3b6e05e 221 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
222 << ": failed to list " << oids[shard]
223 << cpp_strerror(-r) << dendl;
224 return r;
225 }
226 if (!log_entries.empty()) {
227 return 0;
228 }
229 }
230 return 1;
231 }
232};
233
234class RGWDataChangesFIFO final : public RGWDataChangesBE {
235 using centries = std::vector<ceph::buffer::list>;
236 tiny_vector<LazyFIFO> fifos;
237
238public:
239 RGWDataChangesFIFO(lr::IoCtx& ioctx,
240 RGWDataChangesLog& datalog,
241 uint64_t gen_id, int shards)
242 : RGWDataChangesBE(ioctx, datalog, gen_id),
243 fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) {
244 emplacer.emplace(ioctx, get_oid(i));
245 }) {}
246 ~RGWDataChangesFIFO() override = default;
247 void prepare(ceph::real_time, const std::string&,
248 ceph::buffer::list&& entry, entries& out) override {
249 if (!std::holds_alternative<centries>(out)) {
250 ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out));
251 out = centries();
252 }
253 std::get<centries>(out).push_back(std::move(entry));
254 }
b3b6e05e
TL
255 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
256 auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield);
f67539c2 257 if (r < 0) {
b3b6e05e 258 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
259 << ": unable to push to FIFO: " << get_oid(index)
260 << ": " << cpp_strerror(-r) << dendl;
261 }
262 return r;
263 }
b3b6e05e 264 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time,
f67539c2
TL
265 const std::string&,
266 ceph::buffer::list&& bl) override {
b3b6e05e 267 auto r = fifos[index].push(dpp, std::move(bl), null_yield);
f67539c2 268 if (r < 0) {
b3b6e05e 269 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
270 << ": unable to push to FIFO: " << get_oid(index)
271 << ": " << cpp_strerror(-r) << dendl;
272 }
273 return r;
274 }
b3b6e05e 275 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
f67539c2
TL
276 std::vector<rgw_data_change_log_entry>& entries,
277 std::optional<std::string_view> marker,
278 std::string* out_marker, bool* truncated) override {
279 std::vector<rgw::cls::fifo::list_entry> log_entries;
280 bool more = false;
b3b6e05e 281 auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more,
f67539c2
TL
282 null_yield);
283 if (r < 0) {
b3b6e05e 284 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
285 << ": unable to list FIFO: " << get_oid(index)
286 << ": " << cpp_strerror(-r) << dendl;
287 return r;
288 }
289 for (const auto& entry : log_entries) {
290 rgw_data_change_log_entry log_entry;
291 log_entry.log_id = entry.marker;
292 log_entry.log_timestamp = entry.mtime;
293 auto liter = entry.data.cbegin();
294 try {
295 decode(log_entry.entry, liter);
296 } catch (const buffer::error& err) {
b3b6e05e 297 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
298 << ": failed to decode data changes log entry: "
299 << err.what() << dendl;
300 return -EIO;
301 }
302 entries.push_back(std::move(log_entry));
303 }
304 if (truncated)
305 *truncated = more;
306 if (out_marker && !log_entries.empty()) {
307 *out_marker = log_entries.back().marker;
308 }
309 return 0;
310 }
b3b6e05e 311 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
f67539c2 312 auto& fifo = fifos[index];
b3b6e05e 313 auto r = fifo.read_meta(dpp, null_yield);
f67539c2 314 if (r < 0) {
b3b6e05e 315 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
316 << ": unable to get FIFO metadata: " << get_oid(index)
317 << ": " << cpp_strerror(-r) << dendl;
318 return r;
319 }
320 rados::cls::fifo::info m;
b3b6e05e 321 fifo.meta(dpp, m, null_yield);
f67539c2
TL
322 auto p = m.head_part_num;
323 if (p < 0) {
20effc67 324 info->marker = "";
f67539c2
TL
325 info->last_update = ceph::real_clock::zero();
326 return 0;
327 }
328 rgw::cls::fifo::part_info h;
b3b6e05e 329 r = fifo.get_part_info(dpp, p, &h, null_yield);
f67539c2 330 if (r < 0) {
b3b6e05e 331 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
332 << ": unable to get part info: " << get_oid(index) << "/" << p
333 << ": " << cpp_strerror(-r) << dendl;
334 return r;
335 }
336 info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string();
337 info->last_update = h.max_time;
338 return 0;
339 }
b3b6e05e
TL
340 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
341 auto r = fifos[index].trim(dpp, marker, false, null_yield);
f67539c2 342 if (r < 0) {
b3b6e05e 343 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
344 << ": unable to trim FIFO: " << get_oid(index)
345 << ": " << cpp_strerror(-r) << dendl;
346 }
347 return r;
348 }
b3b6e05e 349 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
350 librados::AioCompletion* c) override {
351 int r = 0;
352 if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
353 rgw_complete_aio_completion(c, -ENODATA);
354 } else {
b3b6e05e 355 fifos[index].trim(dpp, marker, false, c, null_yield);
f67539c2
TL
356 }
357 return r;
358 }
359 std::string_view max_marker() const override {
360 static const std::string mm =
361 rgw::cls::fifo::marker::max().to_string();
362 return std::string_view(mm);
363 }
b3b6e05e 364 int is_empty(const DoutPrefixProvider *dpp) override {
f67539c2
TL
365 std::vector<rgw::cls::fifo::list_entry> log_entries;
366 bool more = false;
367 for (auto shard = 0u; shard < fifos.size(); ++shard) {
b3b6e05e 368 auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more,
f67539c2
TL
369 null_yield);
370 if (r < 0) {
b3b6e05e 371 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
372 << ": unable to list FIFO: " << get_oid(shard)
373 << ": " << cpp_strerror(-r) << dendl;
374 return r;
375 }
376 if (!log_entries.empty()) {
377 return 0;
378 }
379 }
380 return 1;
381 }
382};
383
384RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
385 : cct(cct),
386 num_shards(cct->_conf->rgw_data_log_num_shards),
387 prefix(get_prefix()),
388 changes(cct->_conf->rgw_data_log_changes_size) {}
389
390bs::error_code DataLogBackends::handle_init(entries_t e) noexcept {
391 std::unique_lock l(m);
392
393 for (const auto& [gen_id, gen] : e) {
394 if (gen.pruned) {
395 lderr(datalog.cct)
396 << __PRETTY_FUNCTION__ << ":" << __LINE__
397 << ": ERROR: given empty generation: gen_id=" << gen_id << dendl;
398 }
399 if (count(gen_id) != 0) {
400 lderr(datalog.cct)
401 << __PRETTY_FUNCTION__ << ":" << __LINE__
402 << ": ERROR: generation already exists: gen_id=" << gen_id << dendl;
403 }
404 try {
405 switch (gen.type) {
406 case log_type::omap:
407 emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards));
408 break;
409 case log_type::fifo:
410 emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards));
411 break;
412 default:
413 lderr(datalog.cct)
414 << __PRETTY_FUNCTION__ << ":" << __LINE__
415 << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
416 << ", type" << gen.type << dendl;
417 return bs::error_code(EFAULT, bs::system_category());
418 }
419 } catch (const bs::system_error& err) {
420 lderr(datalog.cct)
421 << __PRETTY_FUNCTION__ << ":" << __LINE__
422 << ": error setting up backend: gen_id=" << gen_id
423 << ", err=" << err.what() << dendl;
424 return err.code();
425 }
426 }
427 return {};
428}
429bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept {
430 return handle_init(std::move(e));
431}
432bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept {
433 std::unique_lock l(m);
434 auto i = cbegin();
435 if (i->first < new_tail) {
436 return {};
437 }
438 if (new_tail >= (cend() - 1)->first) {
439 lderr(datalog.cct)
440 << __PRETTY_FUNCTION__ << ":" << __LINE__
441 << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl;
442 return bs::error_code(EFAULT, bs::system_category());
443 }
444 erase(i, upper_bound(new_tail));
445 return {};
446}
447
448
20effc67 449int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
f67539c2
TL
450 const RGWZoneParams& zoneparams,
451 librados::Rados* lr)
452{
453 zone = _zone;
454 ceph_assert(zone);
455 auto defbacking = to_log_type(
456 cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
457 // Should be guaranteed by `set_enum_allowed`
458 ceph_assert(defbacking);
459 auto log_pool = zoneparams.log_pool;
b3b6e05e 460 auto r = rgw_init_ioctx(dpp, lr, log_pool, ioctx, true, false);
f67539c2 461 if (r < 0) {
b3b6e05e 462 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
463 << ": Failed to initialized ioctx, r=" << r
464 << ", pool=" << log_pool << dendl;
465 return -r;
466 }
467
468 auto besr = logback_generations::init<DataLogBackends>(
b3b6e05e 469 dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
f67539c2
TL
470 return get_oid(gen_id, shard);
471 },
472 num_shards, *defbacking, null_yield, *this);
473
474
475 if (!besr) {
20effc67 476 lderr(cct) << __PRETTY_FUNCTION__
f67539c2
TL
477 << ": Error initializing backends: "
478 << besr.error().message() << dendl;
479 return ceph::from_error_code(besr.error());
480 }
481
482 bes = std::move(*besr);
483 renew_thread = make_named_thread("rgw_dt_lg_renew",
484 &RGWDataChangesLog::renew_run, this);
485 return 0;
486}
487
488int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
489 const auto& name = bs.bucket.name;
490 auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
491 auto r = (ceph_str_hash_linux(name.data(), name.size()) +
492 shard_shift) % num_shards;
493 return static_cast<int>(r);
494}
495
b3b6e05e 496int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
f67539c2
TL
497{
498 if (!zone->log_data)
499 return 0;
500
501 /* we can't keep the bucket name as part of the cls_log_entry, and we need
502 * it later, so we keep two lists under the map */
503 bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
504 RGWDataChangesBE::entries>> m;
505
506 std::unique_lock l(lock);
507 decltype(cur_cycle) entries;
508 entries.swap(cur_cycle);
509 l.unlock();
510
511 auto ut = real_clock::now();
512 auto be = bes->head();
513 for (const auto& bs : entries) {
514 auto index = choose_oid(bs);
515
516 rgw_data_change change;
517 bufferlist bl;
518 change.entity_type = ENTITY_TYPE_BUCKET;
519 change.key = bs.get_key();
520 change.timestamp = ut;
521 encode(change, bl);
522
523 m[index].first.push_back(bs);
524 be->prepare(ut, change.key, std::move(bl), m[index].second);
525 }
526
527 for (auto& [index, p] : m) {
528 auto& [buckets, entries] = p;
529
530 auto now = real_clock::now();
531
b3b6e05e 532 auto ret = be->push(dpp, index, std::move(entries));
f67539c2
TL
533 if (ret < 0) {
534 /* we don't really need to have a special handling for failed cases here,
535 * as this is just an optimization. */
b3b6e05e 536 ldpp_dout(dpp, -1) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
f67539c2
TL
537 return ret;
538 }
539
540 auto expiration = now;
541 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
542 for (auto& bs : buckets) {
543 update_renewed(bs, expiration);
544 }
545 }
546
547 return 0;
548}
549
550void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
551 ChangeStatusPtr& status)
552{
553 ceph_assert(ceph_mutex_is_locked(lock));
554 if (!changes.find(bs, status)) {
555 status = ChangeStatusPtr(new ChangeStatus);
556 changes.add(bs, status);
557 }
558}
559
560void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
561{
562 std::scoped_lock l{lock};
563 cur_cycle.insert(bs);
564}
565
566void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
567 real_time expiration)
568{
2a845540 569 std::unique_lock l{lock};
f67539c2
TL
570 ChangeStatusPtr status;
571 _get_change(bs, status);
2a845540
TL
572 l.unlock();
573
f67539c2
TL
574
575 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
576 << bs.bucket.name << " shard_id=" << bs.shard_id
577 << " expiration=" << expiration << dendl;
2a845540
TL
578
579 std::unique_lock sl(status->lock);
f67539c2
TL
580 status->cur_expiration = expiration;
581}
582
583int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
584 rgw_bucket_shard bs(bucket, shard_id);
585 return choose_oid(bs);
586}
587
b3b6e05e
TL
588bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
589 const rgw_bucket& bucket,
f67539c2
TL
590 optional_yield y) const
591{
592 if (!bucket_filter) {
593 return true;
594 }
595
b3b6e05e 596 return bucket_filter(bucket, y, dpp);
f67539c2
TL
597}
598
599std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
600 return (gen_id > 0 ?
601 fmt::format("{}@G{}.{}", prefix, gen_id, i) :
602 fmt::format("{}.{}", prefix, i));
603}
604
b3b6e05e 605int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
f67539c2
TL
606 auto& bucket = bucket_info.bucket;
607
b3b6e05e 608 if (!filter_bucket(dpp, bucket, null_yield)) {
f67539c2
TL
609 return 0;
610 }
611
612 if (observer) {
613 observer->on_bucket_changed(bucket.get_key());
614 }
615
616 rgw_bucket_shard bs(bucket, shard_id);
617
618 int index = choose_oid(bs);
619 mark_modified(index, bs);
620
621 std::unique_lock l(lock);
622
623 ChangeStatusPtr status;
624 _get_change(bs, status);
625 l.unlock();
626
627 auto now = real_clock::now();
628
629 std::unique_lock sl(status->lock);
630
b3b6e05e 631 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
f67539c2
TL
632 << " shard_id=" << shard_id << " now=" << now
633 << " cur_expiration=" << status->cur_expiration << dendl;
634
635 if (now < status->cur_expiration) {
636 /* no need to send, recently completed */
637 sl.unlock();
638 register_renew(bs);
639 return 0;
640 }
641
642 RefCountedCond* cond;
643
644 if (status->pending) {
645 cond = status->cond;
646
647 ceph_assert(cond);
648
649 status->cond->get();
650 sl.unlock();
651
652 int ret = cond->wait();
653 cond->put();
654 if (!ret) {
655 register_renew(bs);
656 }
657 return ret;
658 }
659
660 status->cond = new RefCountedCond;
661 status->pending = true;
662
663 ceph::real_time expiration;
664
665 int ret;
666
667 do {
668 status->cur_sent = now;
669
670 expiration = now;
671 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
672
673 sl.unlock();
674
675 ceph::buffer::list bl;
676 rgw_data_change change;
677 change.entity_type = ENTITY_TYPE_BUCKET;
678 change.key = bs.get_key();
679 change.timestamp = now;
680 encode(change, bl);
681
b3b6e05e 682 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
f67539c2
TL
683
684 auto be = bes->head();
b3b6e05e 685 ret = be->push(dpp, index, now, change.key, std::move(bl));
f67539c2
TL
686
687 now = real_clock::now();
688
689 sl.lock();
690
691 } while (!ret && real_clock::now() > expiration);
692
693 cond = status->cond;
694
695 status->pending = false;
696 /* time of when operation started, not completed */
697 status->cur_expiration = status->cur_sent;
698 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
699 status->cond = nullptr;
700 sl.unlock();
701
702 cond->done(ret);
703 cond->put();
704
705 return ret;
706}
707
b3b6e05e 708int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2 709 std::vector<rgw_data_change_log_entry>& entries,
522d829b
TL
710 std::string_view marker,
711 std::string* out_marker,
712 bool* truncated)
f67539c2 713{
522d829b 714 const auto [start_id, start_cursor] = cursorgen(marker);
f67539c2
TL
715 auto gen_id = start_id;
716 std::string out_cursor;
717 while (max_entries > 0) {
718 std::vector<rgw_data_change_log_entry> gentries;
719 std::unique_lock l(m);
720 auto i = lower_bound(gen_id);
721 if (i == end()) return 0;
722 auto be = i->second;
723 l.unlock();
724 gen_id = be->gen_id;
b3b6e05e 725 auto r = be->list(dpp, shard, max_entries, gentries,
f67539c2
TL
726 gen_id == start_id ? start_cursor : std::string{},
727 &out_cursor, truncated);
728 if (r < 0)
729 return r;
730
731 if (out_marker && !out_cursor.empty()) {
732 *out_marker = gencursor(gen_id, out_cursor);
733 }
734 for (auto& g : gentries) {
735 g.log_id = gencursor(gen_id, g.log_id);
736 }
522d829b 737 if (int s = gentries.size(); s < 0 || s > max_entries)
f67539c2
TL
738 max_entries = 0;
739 else
740 max_entries -= gentries.size();
741
742 std::move(gentries.begin(), gentries.end(),
743 std::back_inserter(entries));
744 ++gen_id;
745 }
746 return 0;
747}
748
b3b6e05e 749int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2 750 std::vector<rgw_data_change_log_entry>& entries,
522d829b 751 std::string_view marker,
f67539c2
TL
752 std::string* out_marker, bool* truncated)
753{
754 assert(shard < num_shards);
b3b6e05e 755 return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated);
f67539c2
TL
756}
757
b3b6e05e 758int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
759 std::vector<rgw_data_change_log_entry>& entries,
760 LogMarker& marker, bool *ptruncated)
761{
762 bool truncated;
763 entries.clear();
764 for (; marker.shard < num_shards && int(entries.size()) < max_entries;
522d829b 765 marker.shard++, marker.marker.clear()) {
b3b6e05e 766 int ret = list_entries(dpp, marker.shard, max_entries - entries.size(),
f67539c2
TL
767 entries, marker.marker, NULL, &truncated);
768 if (ret == -ENOENT) {
769 continue;
770 }
771 if (ret < 0) {
772 return ret;
773 }
1d09f67e
TL
774 if (!truncated) {
775 *ptruncated = false;
f67539c2
TL
776 return 0;
777 }
778 }
779 *ptruncated = (marker.shard < num_shards);
780 return 0;
781}
782
b3b6e05e 783int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info)
f67539c2
TL
784{
785 assert(shard_id < num_shards);
786 auto be = bes->head();
b3b6e05e 787 auto r = be->get_info(dpp, shard_id, info);
f67539c2
TL
788 if (!info->marker.empty()) {
789 info->marker = gencursor(be->gen_id, info->marker);
790 }
791 return r;
792}
793
b3b6e05e 794int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
f67539c2
TL
795{
796 auto [target_gen, cursor] = cursorgen(marker);
797 std::unique_lock l(m);
798 const auto head_gen = (end() - 1)->second->gen_id;
799 const auto tail_gen = begin()->first;
800 if (target_gen < tail_gen) return 0;
801 auto r = 0;
802 for (auto be = lower_bound(0)->second;
803 be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0;
804 be = upper_bound(be->gen_id)->second) {
805 l.unlock();
806 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 807 r = be->trim(dpp, shard_id, c);
f67539c2
TL
808 if (r == -ENOENT)
809 r = -ENODATA;
810 if (r == -ENODATA && be->gen_id < target_gen)
811 r = 0;
522d829b
TL
812 if (be->gen_id == target_gen)
813 break;
f67539c2
TL
814 l.lock();
815 };
816 return r;
817}
818
b3b6e05e 819int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
f67539c2
TL
820{
821 assert(shard_id < num_shards);
b3b6e05e 822 return bes->trim_entries(dpp, shard_id, marker);
f67539c2
TL
823}
824
825class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
826public:
827 DataLogBackends* const bes;
828 const int shard_id;
829 const uint64_t target_gen;
830 const std::string cursor;
831 const uint64_t head_gen;
832 const uint64_t tail_gen;
833 boost::intrusive_ptr<RGWDataChangesBE> be;
834
b3b6e05e 835 GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen,
f67539c2
TL
836 std::string cursor, uint64_t head_gen, uint64_t tail_gen,
837 boost::intrusive_ptr<RGWDataChangesBE> be,
838 lr::AioCompletion* super)
b3b6e05e 839 : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen),
f67539c2
TL
840 cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen),
841 be(std::move(be)) {}
842
b3b6e05e 843 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
844 auto gen_id = be->gen_id;
845 be.reset();
846 if (r == -ENOENT)
847 r = -ENODATA;
848 if (r == -ENODATA && gen_id < target_gen)
849 r = 0;
850 if (r < 0) {
851 complete(std::move(p), r);
852 return;
853 }
854
855 {
856 std::unique_lock l(bes->m);
857 auto i = bes->upper_bound(gen_id);
858 if (i == bes->end() || i->first > target_gen || i->first > head_gen) {
859 l.unlock();
860 complete(std::move(p), -ENODATA);
861 return;
862 }
863 be = i->second;
864 }
865 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 866 be->trim(dpp, shard_id, c, call(std::move(p)));
f67539c2
TL
867 }
868};
869
b3b6e05e 870void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
871 librados::AioCompletion* c)
872{
873 auto [target_gen, cursor] = cursorgen(marker);
874 std::unique_lock l(m);
875 const auto head_gen = (end() - 1)->second->gen_id;
876 const auto tail_gen = begin()->first;
877 if (target_gen < tail_gen) {
878 l.unlock();
879 rgw_complete_aio_completion(c, -ENODATA);
880 return;
881 }
882 auto be = begin()->second;
883 l.unlock();
b3b6e05e 884 auto gt = std::make_unique<GenTrim>(dpp, this, shard_id, target_gen,
f67539c2
TL
885 std::string(cursor), head_gen, tail_gen,
886 be, c);
887
888 auto cc = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 889 be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt)));
f67539c2
TL
890}
891
b3b6e05e 892int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
f67539c2
TL
893 if (size() != 1) {
894 std::vector<mapped_type> candidates;
895 {
896 std::scoped_lock l(m);
897 auto e = cend() - 1;
898 for (auto i = cbegin(); i < e; ++i) {
899 candidates.push_back(i->second);
900 }
901 }
902
903 std::optional<uint64_t> highest;
904 for (auto& be : candidates) {
b3b6e05e 905 auto r = be->is_empty(dpp);
f67539c2
TL
906 if (r < 0) {
907 return r;
908 } else if (r == 1) {
909 highest = be->gen_id;
910 } else {
911 break;
912 }
913 }
914
915 through = highest;
916 if (!highest) {
917 return 0;
918 }
b3b6e05e 919 auto ec = empty_to(dpp, *highest, null_yield);
f67539c2
TL
920 if (ec) {
921 return ceph::from_error_code(ec);
922 }
923 }
924
b3b6e05e 925 return ceph::from_error_code(remove_empty(dpp, null_yield));
f67539c2
TL
926}
927
928
b3b6e05e 929int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
930 librados::AioCompletion* c)
931{
932 assert(shard_id < num_shards);
b3b6e05e 933 bes->trim_entries(dpp, shard_id, marker, c);
f67539c2
TL
934 return 0;
935}
936
937bool RGWDataChangesLog::going_down() const
938{
939 return down_flag;
940}
941
942RGWDataChangesLog::~RGWDataChangesLog() {
943 down_flag = true;
944 if (renew_thread.joinable()) {
945 renew_stop();
946 renew_thread.join();
947 }
948}
949
20effc67 950void RGWDataChangesLog::renew_run() noexcept {
f67539c2
TL
951 static constexpr auto runs_per_prune = 150;
952 auto run = 0;
953 for (;;) {
b3b6e05e
TL
954 const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
955 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
956 int r = renew_entries(&dp);
f67539c2 957 if (r < 0) {
b3b6e05e 958 ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
f67539c2
TL
959 }
960
961 if (going_down())
962 break;
963
964 if (run == runs_per_prune) {
965 std::optional<uint64_t> through;
b3b6e05e
TL
966 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
967 trim_generations(&dp, through);
f67539c2
TL
968 if (r < 0) {
969 derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
970 << r << dendl;
971 } else if (through) {
b3b6e05e 972 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations "
f67539c2
TL
973 << "through " << *through << "." << dendl;
974 } else {
b3b6e05e 975 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune."
f67539c2
TL
976 << dendl;
977 }
978 run = 0;
979 } else {
980 ++run;
981 }
982
983 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
984 std::unique_lock locker{renew_lock};
985 renew_cond.wait_for(locker, std::chrono::seconds(interval));
986 }
987}
988
989void RGWDataChangesLog::renew_stop()
990{
991 std::lock_guard l{renew_lock};
992 renew_cond.notify_all();
993}
994
995void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
996{
20effc67
TL
997 if (!cct->_conf->rgw_data_notify_interval_msec) {
998 return;
999 }
1000
f67539c2
TL
1001 auto key = bs.get_key();
1002 {
1003 std::shared_lock rl{modified_lock}; // read lock to check for existence
1004 auto shard = modified_shards.find(shard_id);
1005 if (shard != modified_shards.end() && shard->second.count(key)) {
1006 return;
1007 }
1008 }
1009
1010 std::unique_lock wl{modified_lock}; // write lock for insertion
1011 modified_shards[shard_id].insert(key);
1012}
1013
1014std::string RGWDataChangesLog::max_marker() const {
1015 return gencursor(std::numeric_limits<uint64_t>::max(),
1016 "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
1017}
1018
b3b6e05e
TL
1019int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) {
1020 return ceph::from_error_code(bes->new_backing(dpp, type, y));
f67539c2
TL
1021}
1022
b3b6e05e
TL
1023int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
1024 return bes->trim_generations(dpp, through);
f67539c2 1025}
20effc67
TL
1026
1027void RGWDataChangesLogInfo::dump(Formatter *f) const
1028{
1029 encode_json("marker", marker, f);
1030 utime_t ut(last_update);
1031 encode_json("last_update", ut, f);
1032}
1033
1034void RGWDataChangesLogInfo::decode_json(JSONObj *obj)
1035{
1036 JSONDecoder::decode_json("marker", marker, obj);
1037 utime_t ut;
1038 JSONDecoder::decode_json("last_update", ut, obj);
1039 last_update = ut.to_real_time();
1040}
1041
1042