]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_datalog.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_datalog.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include <vector>
5
6#include "common/debug.h"
7#include "common/containers.h"
8#include "common/errno.h"
9#include "common/error_code.h"
10
11#include "common/async/blocked_completion.h"
12#include "common/async/librados_completion.h"
13
14#include "cls/fifo/cls_fifo_types.h"
15#include "cls/log/cls_log_client.h"
16
17#include "cls_fifo_legacy.h"
18#include "rgw_datalog.h"
19#include "rgw_log_backing.h"
20#include "rgw_tools.h"
21
22#define dout_context g_ceph_context
23static constexpr auto dout_subsys = ceph_subsys_rgw;
24
25namespace bs = boost::system;
26namespace lr = librados;
27
28using ceph::containers::tiny_vector;
29
30void rgw_data_change::dump(ceph::Formatter *f) const
31{
32 std::string type;
33 switch (entity_type) {
34 case ENTITY_TYPE_BUCKET:
35 type = "bucket";
36 break;
37 default:
38 type = "unknown";
39 }
40 encode_json("entity_type", type, f);
41 encode_json("key", key, f);
42 utime_t ut(timestamp);
43 encode_json("timestamp", ut, f);
44}
45
46void rgw_data_change::decode_json(JSONObj *obj) {
47 std::string s;
48 JSONDecoder::decode_json("entity_type", s, obj);
49 if (s == "bucket") {
50 entity_type = ENTITY_TYPE_BUCKET;
51 } else {
52 entity_type = ENTITY_TYPE_UNKNOWN;
53 }
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58}
59
60void rgw_data_change_log_entry::dump(Formatter *f) const
61{
62 encode_json("log_id", log_id, f);
63 utime_t ut(log_timestamp);
64 encode_json("log_timestamp", ut, f);
65 encode_json("entry", entry, f);
66}
67
68void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
69 JSONDecoder::decode_json("log_id", log_id, obj);
70 utime_t ut;
71 JSONDecoder::decode_json("log_timestamp", ut, obj);
72 log_timestamp = ut.to_real_time();
73 JSONDecoder::decode_json("entry", entry, obj);
74}
75
76class RGWDataChangesOmap final : public RGWDataChangesBE {
77 using centries = std::list<cls_log_entry>;
78 std::vector<std::string> oids;
79
80public:
81 RGWDataChangesOmap(lr::IoCtx& ioctx,
82 RGWDataChangesLog& datalog,
83 uint64_t gen_id,
84 int num_shards)
85 : RGWDataChangesBE(ioctx, datalog, gen_id) {
86 oids.reserve(num_shards);
87 for (auto i = 0; i < num_shards; ++i) {
88 oids.push_back(get_oid(i));
89 }
90 }
91 ~RGWDataChangesOmap() override = default;
b3b6e05e 92
f67539c2
TL
93 void prepare(ceph::real_time ut, const std::string& key,
94 ceph::buffer::list&& entry, entries& out) override {
95 if (!std::holds_alternative<centries>(out)) {
96 ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out));
97 out = centries();
98 }
99
100 cls_log_entry e;
101 cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
102 std::get<centries>(out).push_back(std::move(e));
103 }
b3b6e05e 104 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
f67539c2
TL
105 lr::ObjectWriteOperation op;
106 cls_log_add(op, std::get<centries>(items), true);
b3b6e05e 107 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2 108 if (r < 0) {
b3b6e05e 109 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
110 << ": failed to push to " << oids[index] << cpp_strerror(-r)
111 << dendl;
112 }
113 return r;
114 }
b3b6e05e 115 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
f67539c2
TL
116 const std::string& key,
117 ceph::buffer::list&& bl) override {
118 lr::ObjectWriteOperation op;
119 cls_log_add(op, utime_t(now), {}, key, bl);
b3b6e05e 120 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2 121 if (r < 0) {
b3b6e05e 122 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
123 << ": failed to push to " << oids[index]
124 << cpp_strerror(-r) << dendl;
125 }
126 return r;
127 }
b3b6e05e 128 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
f67539c2
TL
129 std::vector<rgw_data_change_log_entry>& entries,
130 std::optional<std::string_view> marker,
131 std::string* out_marker, bool* truncated) override {
132 std::list<cls_log_entry> log_entries;
133 lr::ObjectReadOperation op;
134 cls_log_list(op, {}, {}, std::string(marker.value_or("")),
135 max_entries, log_entries, out_marker, truncated);
b3b6e05e 136 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
f67539c2
TL
137 if (r == -ENOENT) {
138 *truncated = false;
139 return 0;
140 }
141 if (r < 0) {
b3b6e05e 142 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
143 << ": failed to list " << oids[index]
144 << cpp_strerror(-r) << dendl;
145 return r;
146 }
147 for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
148 rgw_data_change_log_entry log_entry;
149 log_entry.log_id = iter->id;
150 auto rt = iter->timestamp.to_real_time();
151 log_entry.log_timestamp = rt;
152 auto liter = iter->data.cbegin();
153 try {
154 decode(log_entry.entry, liter);
155 } catch (ceph::buffer::error& err) {
b3b6e05e 156 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
157 << ": failed to decode data changes log entry: "
158 << err.what() << dendl;
159 return -EIO;
160 }
161 entries.push_back(log_entry);
162 }
163 return 0;
164 }
b3b6e05e 165 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
f67539c2
TL
166 cls_log_header header;
167 lr::ObjectReadOperation op;
168 cls_log_info(op, &header);
b3b6e05e 169 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
f67539c2
TL
170 if (r == -ENOENT) r = 0;
171 if (r < 0) {
b3b6e05e 172 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
173 << ": failed to get info from " << oids[index]
174 << cpp_strerror(-r) << dendl;
175 } else {
176 info->marker = header.max_marker;
177 info->last_update = header.max_time.to_real_time();
178 }
179 return r;
180 }
b3b6e05e 181 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
f67539c2
TL
182 lr::ObjectWriteOperation op;
183 cls_log_trim(op, {}, {}, {}, std::string(marker));
b3b6e05e 184 auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
f67539c2
TL
185 if (r == -ENOENT) r = -ENODATA;
186 if (r < 0 && r != -ENODATA) {
b3b6e05e 187 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
188 << ": failed to get info from " << oids[index]
189 << cpp_strerror(-r) << dendl;
190 }
191 return r;
192 }
b3b6e05e 193 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
194 lr::AioCompletion* c) override {
195 lr::ObjectWriteOperation op;
196 cls_log_trim(op, {}, {}, {}, std::string(marker));
197 auto r = ioctx.aio_operate(oids[index], c, &op, 0);
198 if (r == -ENOENT) r = -ENODATA;
199 if (r < 0) {
20effc67 200 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
201 << ": failed to get info from " << oids[index]
202 << cpp_strerror(-r) << dendl;
203 }
204 return r;
205 }
206 std::string_view max_marker() const override {
20effc67 207 return "99999999";
f67539c2 208 }
b3b6e05e 209 int is_empty(const DoutPrefixProvider *dpp) override {
f67539c2
TL
210 for (auto shard = 0u; shard < oids.size(); ++shard) {
211 std::list<cls_log_entry> log_entries;
212 lr::ObjectReadOperation op;
213 std::string out_marker;
214 bool truncated;
215 cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated);
b3b6e05e 216 auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield);
f67539c2
TL
217 if (r == -ENOENT) {
218 continue;
219 }
220 if (r < 0) {
b3b6e05e 221 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
222 << ": failed to list " << oids[shard]
223 << cpp_strerror(-r) << dendl;
224 return r;
225 }
226 if (!log_entries.empty()) {
227 return 0;
228 }
229 }
230 return 1;
231 }
232};
233
234class RGWDataChangesFIFO final : public RGWDataChangesBE {
235 using centries = std::vector<ceph::buffer::list>;
236 tiny_vector<LazyFIFO> fifos;
237
238public:
239 RGWDataChangesFIFO(lr::IoCtx& ioctx,
240 RGWDataChangesLog& datalog,
241 uint64_t gen_id, int shards)
242 : RGWDataChangesBE(ioctx, datalog, gen_id),
243 fifos(shards, [&ioctx, this](std::size_t i, auto emplacer) {
244 emplacer.emplace(ioctx, get_oid(i));
245 }) {}
246 ~RGWDataChangesFIFO() override = default;
247 void prepare(ceph::real_time, const std::string&,
248 ceph::buffer::list&& entry, entries& out) override {
249 if (!std::holds_alternative<centries>(out)) {
250 ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out));
251 out = centries();
252 }
253 std::get<centries>(out).push_back(std::move(entry));
254 }
b3b6e05e
TL
255 int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
256 auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield);
f67539c2 257 if (r < 0) {
b3b6e05e 258 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
259 << ": unable to push to FIFO: " << get_oid(index)
260 << ": " << cpp_strerror(-r) << dendl;
261 }
262 return r;
263 }
b3b6e05e 264 int push(const DoutPrefixProvider *dpp, int index, ceph::real_time,
f67539c2
TL
265 const std::string&,
266 ceph::buffer::list&& bl) override {
b3b6e05e 267 auto r = fifos[index].push(dpp, std::move(bl), null_yield);
f67539c2 268 if (r < 0) {
b3b6e05e 269 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
270 << ": unable to push to FIFO: " << get_oid(index)
271 << ": " << cpp_strerror(-r) << dendl;
272 }
273 return r;
274 }
b3b6e05e 275 int list(const DoutPrefixProvider *dpp, int index, int max_entries,
f67539c2
TL
276 std::vector<rgw_data_change_log_entry>& entries,
277 std::optional<std::string_view> marker,
278 std::string* out_marker, bool* truncated) override {
279 std::vector<rgw::cls::fifo::list_entry> log_entries;
280 bool more = false;
b3b6e05e 281 auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more,
f67539c2
TL
282 null_yield);
283 if (r < 0) {
b3b6e05e 284 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
285 << ": unable to list FIFO: " << get_oid(index)
286 << ": " << cpp_strerror(-r) << dendl;
287 return r;
288 }
289 for (const auto& entry : log_entries) {
290 rgw_data_change_log_entry log_entry;
291 log_entry.log_id = entry.marker;
292 log_entry.log_timestamp = entry.mtime;
293 auto liter = entry.data.cbegin();
294 try {
295 decode(log_entry.entry, liter);
296 } catch (const buffer::error& err) {
b3b6e05e 297 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
298 << ": failed to decode data changes log entry: "
299 << err.what() << dendl;
300 return -EIO;
301 }
302 entries.push_back(std::move(log_entry));
303 }
304 if (truncated)
305 *truncated = more;
306 if (out_marker && !log_entries.empty()) {
307 *out_marker = log_entries.back().marker;
308 }
309 return 0;
310 }
b3b6e05e 311 int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
f67539c2 312 auto& fifo = fifos[index];
b3b6e05e 313 auto r = fifo.read_meta(dpp, null_yield);
f67539c2 314 if (r < 0) {
b3b6e05e 315 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
316 << ": unable to get FIFO metadata: " << get_oid(index)
317 << ": " << cpp_strerror(-r) << dendl;
318 return r;
319 }
320 rados::cls::fifo::info m;
b3b6e05e 321 fifo.meta(dpp, m, null_yield);
f67539c2
TL
322 auto p = m.head_part_num;
323 if (p < 0) {
20effc67 324 info->marker = "";
f67539c2
TL
325 info->last_update = ceph::real_clock::zero();
326 return 0;
327 }
328 rgw::cls::fifo::part_info h;
b3b6e05e 329 r = fifo.get_part_info(dpp, p, &h, null_yield);
f67539c2 330 if (r < 0) {
b3b6e05e 331 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
332 << ": unable to get part info: " << get_oid(index) << "/" << p
333 << ": " << cpp_strerror(-r) << dendl;
334 return r;
335 }
336 info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string();
337 info->last_update = h.max_time;
338 return 0;
339 }
b3b6e05e
TL
340 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
341 auto r = fifos[index].trim(dpp, marker, false, null_yield);
f67539c2 342 if (r < 0) {
b3b6e05e 343 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
344 << ": unable to trim FIFO: " << get_oid(index)
345 << ": " << cpp_strerror(-r) << dendl;
346 }
347 return r;
348 }
b3b6e05e 349 int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
f67539c2
TL
350 librados::AioCompletion* c) override {
351 int r = 0;
352 if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
353 rgw_complete_aio_completion(c, -ENODATA);
354 } else {
b3b6e05e 355 fifos[index].trim(dpp, marker, false, c, null_yield);
f67539c2
TL
356 }
357 return r;
358 }
359 std::string_view max_marker() const override {
360 static const std::string mm =
361 rgw::cls::fifo::marker::max().to_string();
362 return std::string_view(mm);
363 }
b3b6e05e 364 int is_empty(const DoutPrefixProvider *dpp) override {
f67539c2
TL
365 std::vector<rgw::cls::fifo::list_entry> log_entries;
366 bool more = false;
367 for (auto shard = 0u; shard < fifos.size(); ++shard) {
b3b6e05e 368 auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more,
f67539c2
TL
369 null_yield);
370 if (r < 0) {
b3b6e05e 371 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
372 << ": unable to list FIFO: " << get_oid(shard)
373 << ": " << cpp_strerror(-r) << dendl;
374 return r;
375 }
376 if (!log_entries.empty()) {
377 return 0;
378 }
379 }
380 return 1;
381 }
382};
383
384RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
385 : cct(cct),
386 num_shards(cct->_conf->rgw_data_log_num_shards),
387 prefix(get_prefix()),
388 changes(cct->_conf->rgw_data_log_changes_size) {}
389
390bs::error_code DataLogBackends::handle_init(entries_t e) noexcept {
391 std::unique_lock l(m);
392
393 for (const auto& [gen_id, gen] : e) {
394 if (gen.pruned) {
395 lderr(datalog.cct)
396 << __PRETTY_FUNCTION__ << ":" << __LINE__
397 << ": ERROR: given empty generation: gen_id=" << gen_id << dendl;
398 }
399 if (count(gen_id) != 0) {
400 lderr(datalog.cct)
401 << __PRETTY_FUNCTION__ << ":" << __LINE__
402 << ": ERROR: generation already exists: gen_id=" << gen_id << dendl;
403 }
404 try {
405 switch (gen.type) {
406 case log_type::omap:
407 emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards));
408 break;
409 case log_type::fifo:
410 emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards));
411 break;
412 default:
413 lderr(datalog.cct)
414 << __PRETTY_FUNCTION__ << ":" << __LINE__
415 << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
416 << ", type" << gen.type << dendl;
417 return bs::error_code(EFAULT, bs::system_category());
418 }
419 } catch (const bs::system_error& err) {
420 lderr(datalog.cct)
421 << __PRETTY_FUNCTION__ << ":" << __LINE__
422 << ": error setting up backend: gen_id=" << gen_id
423 << ", err=" << err.what() << dendl;
424 return err.code();
425 }
426 }
427 return {};
428}
429bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept {
430 return handle_init(std::move(e));
431}
432bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept {
433 std::unique_lock l(m);
434 auto i = cbegin();
435 if (i->first < new_tail) {
436 return {};
437 }
438 if (new_tail >= (cend() - 1)->first) {
439 lderr(datalog.cct)
440 << __PRETTY_FUNCTION__ << ":" << __LINE__
441 << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl;
442 return bs::error_code(EFAULT, bs::system_category());
443 }
444 erase(i, upper_bound(new_tail));
445 return {};
446}
447
448
20effc67 449int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
f67539c2
TL
450 const RGWZoneParams& zoneparams,
451 librados::Rados* lr)
452{
453 zone = _zone;
454 ceph_assert(zone);
455 auto defbacking = to_log_type(
456 cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
457 // Should be guaranteed by `set_enum_allowed`
458 ceph_assert(defbacking);
459 auto log_pool = zoneparams.log_pool;
b3b6e05e 460 auto r = rgw_init_ioctx(dpp, lr, log_pool, ioctx, true, false);
f67539c2 461 if (r < 0) {
b3b6e05e 462 ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
f67539c2
TL
463 << ": Failed to initialized ioctx, r=" << r
464 << ", pool=" << log_pool << dendl;
465 return -r;
466 }
467
468 auto besr = logback_generations::init<DataLogBackends>(
b3b6e05e 469 dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
f67539c2
TL
470 return get_oid(gen_id, shard);
471 },
472 num_shards, *defbacking, null_yield, *this);
473
474
475 if (!besr) {
20effc67 476 lderr(cct) << __PRETTY_FUNCTION__
f67539c2
TL
477 << ": Error initializing backends: "
478 << besr.error().message() << dendl;
479 return ceph::from_error_code(besr.error());
480 }
481
482 bes = std::move(*besr);
483 renew_thread = make_named_thread("rgw_dt_lg_renew",
484 &RGWDataChangesLog::renew_run, this);
485 return 0;
486}
487
488int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
489 const auto& name = bs.bucket.name;
490 auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
491 auto r = (ceph_str_hash_linux(name.data(), name.size()) +
492 shard_shift) % num_shards;
493 return static_cast<int>(r);
494}
495
b3b6e05e 496int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
f67539c2
TL
497{
498 if (!zone->log_data)
499 return 0;
500
501 /* we can't keep the bucket name as part of the cls_log_entry, and we need
502 * it later, so we keep two lists under the map */
503 bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
504 RGWDataChangesBE::entries>> m;
505
506 std::unique_lock l(lock);
507 decltype(cur_cycle) entries;
508 entries.swap(cur_cycle);
509 l.unlock();
510
511 auto ut = real_clock::now();
512 auto be = bes->head();
513 for (const auto& bs : entries) {
514 auto index = choose_oid(bs);
515
516 rgw_data_change change;
517 bufferlist bl;
518 change.entity_type = ENTITY_TYPE_BUCKET;
519 change.key = bs.get_key();
520 change.timestamp = ut;
521 encode(change, bl);
522
523 m[index].first.push_back(bs);
524 be->prepare(ut, change.key, std::move(bl), m[index].second);
525 }
526
527 for (auto& [index, p] : m) {
528 auto& [buckets, entries] = p;
529
530 auto now = real_clock::now();
531
b3b6e05e 532 auto ret = be->push(dpp, index, std::move(entries));
f67539c2
TL
533 if (ret < 0) {
534 /* we don't really need to have a special handling for failed cases here,
535 * as this is just an optimization. */
b3b6e05e 536 ldpp_dout(dpp, -1) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
f67539c2
TL
537 return ret;
538 }
539
540 auto expiration = now;
541 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
542 for (auto& bs : buckets) {
543 update_renewed(bs, expiration);
544 }
545 }
546
547 return 0;
548}
549
550void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
551 ChangeStatusPtr& status)
552{
553 ceph_assert(ceph_mutex_is_locked(lock));
554 if (!changes.find(bs, status)) {
555 status = ChangeStatusPtr(new ChangeStatus);
556 changes.add(bs, status);
557 }
558}
559
560void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
561{
562 std::scoped_lock l{lock};
563 cur_cycle.insert(bs);
564}
565
566void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
567 real_time expiration)
568{
569 std::scoped_lock l{lock};
570 ChangeStatusPtr status;
571 _get_change(bs, status);
572
573 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
574 << bs.bucket.name << " shard_id=" << bs.shard_id
575 << " expiration=" << expiration << dendl;
576 status->cur_expiration = expiration;
577}
578
579int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
580 rgw_bucket_shard bs(bucket, shard_id);
581 return choose_oid(bs);
582}
583
b3b6e05e
TL
584bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
585 const rgw_bucket& bucket,
f67539c2
TL
586 optional_yield y) const
587{
588 if (!bucket_filter) {
589 return true;
590 }
591
b3b6e05e 592 return bucket_filter(bucket, y, dpp);
f67539c2
TL
593}
594
595std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
596 return (gen_id > 0 ?
597 fmt::format("{}@G{}.{}", prefix, gen_id, i) :
598 fmt::format("{}.{}", prefix, i));
599}
600
b3b6e05e 601int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
f67539c2
TL
602 auto& bucket = bucket_info.bucket;
603
b3b6e05e 604 if (!filter_bucket(dpp, bucket, null_yield)) {
f67539c2
TL
605 return 0;
606 }
607
608 if (observer) {
609 observer->on_bucket_changed(bucket.get_key());
610 }
611
612 rgw_bucket_shard bs(bucket, shard_id);
613
614 int index = choose_oid(bs);
615 mark_modified(index, bs);
616
617 std::unique_lock l(lock);
618
619 ChangeStatusPtr status;
620 _get_change(bs, status);
621 l.unlock();
622
623 auto now = real_clock::now();
624
625 std::unique_lock sl(status->lock);
626
b3b6e05e 627 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
f67539c2
TL
628 << " shard_id=" << shard_id << " now=" << now
629 << " cur_expiration=" << status->cur_expiration << dendl;
630
631 if (now < status->cur_expiration) {
632 /* no need to send, recently completed */
633 sl.unlock();
634 register_renew(bs);
635 return 0;
636 }
637
638 RefCountedCond* cond;
639
640 if (status->pending) {
641 cond = status->cond;
642
643 ceph_assert(cond);
644
645 status->cond->get();
646 sl.unlock();
647
648 int ret = cond->wait();
649 cond->put();
650 if (!ret) {
651 register_renew(bs);
652 }
653 return ret;
654 }
655
656 status->cond = new RefCountedCond;
657 status->pending = true;
658
659 ceph::real_time expiration;
660
661 int ret;
662
663 do {
664 status->cur_sent = now;
665
666 expiration = now;
667 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
668
669 sl.unlock();
670
671 ceph::buffer::list bl;
672 rgw_data_change change;
673 change.entity_type = ENTITY_TYPE_BUCKET;
674 change.key = bs.get_key();
675 change.timestamp = now;
676 encode(change, bl);
677
b3b6e05e 678 ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
f67539c2
TL
679
680 auto be = bes->head();
b3b6e05e 681 ret = be->push(dpp, index, now, change.key, std::move(bl));
f67539c2
TL
682
683 now = real_clock::now();
684
685 sl.lock();
686
687 } while (!ret && real_clock::now() > expiration);
688
689 cond = status->cond;
690
691 status->pending = false;
692 /* time of when operation started, not completed */
693 status->cur_expiration = status->cur_sent;
694 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
695 status->cond = nullptr;
696 sl.unlock();
697
698 cond->done(ret);
699 cond->put();
700
701 return ret;
702}
703
b3b6e05e 704int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2 705 std::vector<rgw_data_change_log_entry>& entries,
522d829b
TL
706 std::string_view marker,
707 std::string* out_marker,
708 bool* truncated)
f67539c2 709{
522d829b 710 const auto [start_id, start_cursor] = cursorgen(marker);
f67539c2
TL
711 auto gen_id = start_id;
712 std::string out_cursor;
713 while (max_entries > 0) {
714 std::vector<rgw_data_change_log_entry> gentries;
715 std::unique_lock l(m);
716 auto i = lower_bound(gen_id);
717 if (i == end()) return 0;
718 auto be = i->second;
719 l.unlock();
720 gen_id = be->gen_id;
b3b6e05e 721 auto r = be->list(dpp, shard, max_entries, gentries,
f67539c2
TL
722 gen_id == start_id ? start_cursor : std::string{},
723 &out_cursor, truncated);
724 if (r < 0)
725 return r;
726
727 if (out_marker && !out_cursor.empty()) {
728 *out_marker = gencursor(gen_id, out_cursor);
729 }
730 for (auto& g : gentries) {
731 g.log_id = gencursor(gen_id, g.log_id);
732 }
522d829b 733 if (int s = gentries.size(); s < 0 || s > max_entries)
f67539c2
TL
734 max_entries = 0;
735 else
736 max_entries -= gentries.size();
737
738 std::move(gentries.begin(), gentries.end(),
739 std::back_inserter(entries));
740 ++gen_id;
741 }
742 return 0;
743}
744
b3b6e05e 745int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
f67539c2 746 std::vector<rgw_data_change_log_entry>& entries,
522d829b 747 std::string_view marker,
f67539c2
TL
748 std::string* out_marker, bool* truncated)
749{
750 assert(shard < num_shards);
b3b6e05e 751 return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated);
f67539c2
TL
752}
753
b3b6e05e 754int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries,
f67539c2
TL
755 std::vector<rgw_data_change_log_entry>& entries,
756 LogMarker& marker, bool *ptruncated)
757{
758 bool truncated;
759 entries.clear();
760 for (; marker.shard < num_shards && int(entries.size()) < max_entries;
522d829b 761 marker.shard++, marker.marker.clear()) {
b3b6e05e 762 int ret = list_entries(dpp, marker.shard, max_entries - entries.size(),
f67539c2
TL
763 entries, marker.marker, NULL, &truncated);
764 if (ret == -ENOENT) {
765 continue;
766 }
767 if (ret < 0) {
768 return ret;
769 }
770 if (truncated) {
771 *ptruncated = true;
772 return 0;
773 }
774 }
775 *ptruncated = (marker.shard < num_shards);
776 return 0;
777}
778
b3b6e05e 779int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info)
f67539c2
TL
780{
781 assert(shard_id < num_shards);
782 auto be = bes->head();
b3b6e05e 783 auto r = be->get_info(dpp, shard_id, info);
f67539c2
TL
784 if (!info->marker.empty()) {
785 info->marker = gencursor(be->gen_id, info->marker);
786 }
787 return r;
788}
789
b3b6e05e 790int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
f67539c2
TL
791{
792 auto [target_gen, cursor] = cursorgen(marker);
793 std::unique_lock l(m);
794 const auto head_gen = (end() - 1)->second->gen_id;
795 const auto tail_gen = begin()->first;
796 if (target_gen < tail_gen) return 0;
797 auto r = 0;
798 for (auto be = lower_bound(0)->second;
799 be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0;
800 be = upper_bound(be->gen_id)->second) {
801 l.unlock();
802 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 803 r = be->trim(dpp, shard_id, c);
f67539c2
TL
804 if (r == -ENOENT)
805 r = -ENODATA;
806 if (r == -ENODATA && be->gen_id < target_gen)
807 r = 0;
522d829b
TL
808 if (be->gen_id == target_gen)
809 break;
f67539c2
TL
810 l.lock();
811 };
812 return r;
813}
814
b3b6e05e 815int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
f67539c2
TL
816{
817 assert(shard_id < num_shards);
b3b6e05e 818 return bes->trim_entries(dpp, shard_id, marker);
f67539c2
TL
819}
820
821class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
822public:
823 DataLogBackends* const bes;
824 const int shard_id;
825 const uint64_t target_gen;
826 const std::string cursor;
827 const uint64_t head_gen;
828 const uint64_t tail_gen;
829 boost::intrusive_ptr<RGWDataChangesBE> be;
830
b3b6e05e 831 GenTrim(const DoutPrefixProvider *dpp, DataLogBackends* bes, int shard_id, uint64_t target_gen,
f67539c2
TL
832 std::string cursor, uint64_t head_gen, uint64_t tail_gen,
833 boost::intrusive_ptr<RGWDataChangesBE> be,
834 lr::AioCompletion* super)
b3b6e05e 835 : Completion(dpp, super), bes(bes), shard_id(shard_id), target_gen(target_gen),
f67539c2
TL
836 cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen),
837 be(std::move(be)) {}
838
b3b6e05e 839 void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
f67539c2
TL
840 auto gen_id = be->gen_id;
841 be.reset();
842 if (r == -ENOENT)
843 r = -ENODATA;
844 if (r == -ENODATA && gen_id < target_gen)
845 r = 0;
846 if (r < 0) {
847 complete(std::move(p), r);
848 return;
849 }
850
851 {
852 std::unique_lock l(bes->m);
853 auto i = bes->upper_bound(gen_id);
854 if (i == bes->end() || i->first > target_gen || i->first > head_gen) {
855 l.unlock();
856 complete(std::move(p), -ENODATA);
857 return;
858 }
859 be = i->second;
860 }
861 auto c = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 862 be->trim(dpp, shard_id, c, call(std::move(p)));
f67539c2
TL
863 }
864};
865
b3b6e05e 866void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
867 librados::AioCompletion* c)
868{
869 auto [target_gen, cursor] = cursorgen(marker);
870 std::unique_lock l(m);
871 const auto head_gen = (end() - 1)->second->gen_id;
872 const auto tail_gen = begin()->first;
873 if (target_gen < tail_gen) {
874 l.unlock();
875 rgw_complete_aio_completion(c, -ENODATA);
876 return;
877 }
878 auto be = begin()->second;
879 l.unlock();
b3b6e05e 880 auto gt = std::make_unique<GenTrim>(dpp, this, shard_id, target_gen,
f67539c2
TL
881 std::string(cursor), head_gen, tail_gen,
882 be, c);
883
884 auto cc = be->gen_id == target_gen ? cursor : be->max_marker();
b3b6e05e 885 be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt)));
f67539c2
TL
886}
887
b3b6e05e 888int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
f67539c2
TL
889 if (size() != 1) {
890 std::vector<mapped_type> candidates;
891 {
892 std::scoped_lock l(m);
893 auto e = cend() - 1;
894 for (auto i = cbegin(); i < e; ++i) {
895 candidates.push_back(i->second);
896 }
897 }
898
899 std::optional<uint64_t> highest;
900 for (auto& be : candidates) {
b3b6e05e 901 auto r = be->is_empty(dpp);
f67539c2
TL
902 if (r < 0) {
903 return r;
904 } else if (r == 1) {
905 highest = be->gen_id;
906 } else {
907 break;
908 }
909 }
910
911 through = highest;
912 if (!highest) {
913 return 0;
914 }
b3b6e05e 915 auto ec = empty_to(dpp, *highest, null_yield);
f67539c2
TL
916 if (ec) {
917 return ceph::from_error_code(ec);
918 }
919 }
920
b3b6e05e 921 return ceph::from_error_code(remove_empty(dpp, null_yield));
f67539c2
TL
922}
923
924
b3b6e05e 925int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
f67539c2
TL
926 librados::AioCompletion* c)
927{
928 assert(shard_id < num_shards);
b3b6e05e 929 bes->trim_entries(dpp, shard_id, marker, c);
f67539c2
TL
930 return 0;
931}
932
933bool RGWDataChangesLog::going_down() const
934{
935 return down_flag;
936}
937
938RGWDataChangesLog::~RGWDataChangesLog() {
939 down_flag = true;
940 if (renew_thread.joinable()) {
941 renew_stop();
942 renew_thread.join();
943 }
944}
945
20effc67 946void RGWDataChangesLog::renew_run() noexcept {
f67539c2
TL
947 static constexpr auto runs_per_prune = 150;
948 auto run = 0;
949 for (;;) {
b3b6e05e
TL
950 const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
951 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
952 int r = renew_entries(&dp);
f67539c2 953 if (r < 0) {
b3b6e05e 954 ldpp_dout(&dp, 0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
f67539c2
TL
955 }
956
957 if (going_down())
958 break;
959
960 if (run == runs_per_prune) {
961 std::optional<uint64_t> through;
b3b6e05e
TL
962 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
963 trim_generations(&dp, through);
f67539c2
TL
964 if (r < 0) {
965 derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
966 << r << dendl;
967 } else if (through) {
b3b6e05e 968 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruned generations "
f67539c2
TL
969 << "through " << *through << "." << dendl;
970 } else {
b3b6e05e 971 ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: nothing to prune."
f67539c2
TL
972 << dendl;
973 }
974 run = 0;
975 } else {
976 ++run;
977 }
978
979 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
980 std::unique_lock locker{renew_lock};
981 renew_cond.wait_for(locker, std::chrono::seconds(interval));
982 }
983}
984
985void RGWDataChangesLog::renew_stop()
986{
987 std::lock_guard l{renew_lock};
988 renew_cond.notify_all();
989}
990
991void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
992{
20effc67
TL
993 if (!cct->_conf->rgw_data_notify_interval_msec) {
994 return;
995 }
996
f67539c2
TL
997 auto key = bs.get_key();
998 {
999 std::shared_lock rl{modified_lock}; // read lock to check for existence
1000 auto shard = modified_shards.find(shard_id);
1001 if (shard != modified_shards.end() && shard->second.count(key)) {
1002 return;
1003 }
1004 }
1005
1006 std::unique_lock wl{modified_lock}; // write lock for insertion
1007 modified_shards[shard_id].insert(key);
1008}
1009
1010std::string RGWDataChangesLog::max_marker() const {
1011 return gencursor(std::numeric_limits<uint64_t>::max(),
1012 "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
1013}
1014
b3b6e05e
TL
1015int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) {
1016 return ceph::from_error_code(bes->new_backing(dpp, type, y));
f67539c2
TL
1017}
1018
b3b6e05e
TL
1019int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
1020 return bes->trim_generations(dpp, through);
f67539c2 1021}
20effc67
TL
1022
1023void RGWDataChangesLogInfo::dump(Formatter *f) const
1024{
1025 encode_json("marker", marker, f);
1026 utime_t ut(last_update);
1027 encode_json("last_update", ut, f);
1028}
1029
1030void RGWDataChangesLogInfo::decode_json(JSONObj *obj)
1031{
1032 JSONDecoder::decode_json("marker", marker, obj);
1033 utime_t ut;
1034 JSONDecoder::decode_json("last_update", ut, obj);
1035 last_update = ut.to_real_time();
1036}
1037
1038