]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_log_trim.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_sync_log_trim.cc
CommitLineData
b32b8144
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
11fdf7f2 3
b32b8144
FG
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2017 Red Hat, Inc
8 *
9 * Author: Casey Bodley <cbodley@redhat.com>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 */
16
17#include <mutex>
18#include <boost/circular_buffer.hpp>
19#include <boost/container/flat_map.hpp>
20
f64942e4 21#include "include/scope_guard.h"
b32b8144
FG
22#include "common/bounded_key_counter.h"
23#include "common/errno.h"
24#include "rgw_sync_log_trim.h"
25#include "rgw_cr_rados.h"
26#include "rgw_cr_rest.h"
27#include "rgw_data_sync.h"
28#include "rgw_metadata.h"
29#include "rgw_rados.h"
11fdf7f2 30#include "rgw_zone.h"
b32b8144
FG
31#include "rgw_sync.h"
32
11fdf7f2
TL
33#include "services/svc_zone.h"
34
b32b8144 35#include <boost/asio/yield.hpp>
11fdf7f2 36#include "include/ceph_assert.h"
b32b8144
FG
37
38#define dout_subsys ceph_subsys_rgw
39
40#undef dout_prefix
41#define dout_prefix (*_dout << "trim: ")
42
43using rgw::BucketTrimConfig;
44using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
45
46const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
47using rgw::BucketTrimStatus;
48
49
50// watch/notify api for gateways to coordinate about which buckets to trim
51enum TrimNotifyType {
52 NotifyTrimCounters = 0,
53 NotifyTrimComplete,
54};
55WRITE_RAW_ENCODER(TrimNotifyType);
56
57struct TrimNotifyHandler {
58 virtual ~TrimNotifyHandler() = default;
59
11fdf7f2 60 virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
b32b8144
FG
61};
62
63/// api to share the bucket trim counters between gateways in the same zone.
64/// each gateway will process different datalog shards, so the gateway that runs
65/// the trim process needs to accumulate their counters
66struct TrimCounters {
67 /// counter for a single bucket
68 struct BucketCounter {
69 std::string bucket; //< bucket instance metadata key
70 int count{0};
71
72 BucketCounter() = default;
73 BucketCounter(const std::string& bucket, int count)
74 : bucket(bucket), count(count) {}
75
76 void encode(bufferlist& bl) const;
11fdf7f2 77 void decode(bufferlist::const_iterator& p);
b32b8144
FG
78 };
79 using Vector = std::vector<BucketCounter>;
80
81 /// request bucket trim counters from peer gateways
82 struct Request {
83 uint16_t max_buckets; //< maximum number of bucket counters to return
84
85 void encode(bufferlist& bl) const;
11fdf7f2 86 void decode(bufferlist::const_iterator& p);
b32b8144
FG
87 };
88
89 /// return the current bucket trim counters
90 struct Response {
91 Vector bucket_counters;
92
93 void encode(bufferlist& bl) const;
11fdf7f2 94 void decode(bufferlist::const_iterator& p);
b32b8144
FG
95 };
96
97 /// server interface to query the hottest buckets
98 struct Server {
99 virtual ~Server() = default;
100
101 virtual void get_bucket_counters(int count, Vector& counters) = 0;
102 virtual void reset_bucket_counters() = 0;
103 };
104
105 /// notify handler
106 class Handler : public TrimNotifyHandler {
107 Server *const server;
108 public:
11fdf7f2 109 explicit Handler(Server *server) : server(server) {}
b32b8144 110
11fdf7f2 111 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
b32b8144
FG
112 };
113};
114std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
115{
116 return out << rhs.bucket << ":" << rhs.count;
117}
118
119void TrimCounters::BucketCounter::encode(bufferlist& bl) const
120{
11fdf7f2 121 using ceph::encode;
b32b8144 122 // no versioning to save space
11fdf7f2
TL
123 encode(bucket, bl);
124 encode(count, bl);
b32b8144 125}
11fdf7f2 126void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
b32b8144 127{
11fdf7f2
TL
128 using ceph::decode;
129 decode(bucket, p);
130 decode(count, p);
b32b8144
FG
131}
132WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
133
134void TrimCounters::Request::encode(bufferlist& bl) const
135{
136 ENCODE_START(1, 1, bl);
11fdf7f2 137 encode(max_buckets, bl);
b32b8144
FG
138 ENCODE_FINISH(bl);
139}
11fdf7f2 140void TrimCounters::Request::decode(bufferlist::const_iterator& p)
b32b8144
FG
141{
142 DECODE_START(1, p);
11fdf7f2 143 decode(max_buckets, p);
b32b8144
FG
144 DECODE_FINISH(p);
145}
146WRITE_CLASS_ENCODER(TrimCounters::Request);
147
148void TrimCounters::Response::encode(bufferlist& bl) const
149{
150 ENCODE_START(1, 1, bl);
11fdf7f2 151 encode(bucket_counters, bl);
b32b8144
FG
152 ENCODE_FINISH(bl);
153}
11fdf7f2 154void TrimCounters::Response::decode(bufferlist::const_iterator& p)
b32b8144
FG
155{
156 DECODE_START(1, p);
11fdf7f2 157 decode(bucket_counters, p);
b32b8144
FG
158 DECODE_FINISH(p);
159}
160WRITE_CLASS_ENCODER(TrimCounters::Response);
161
11fdf7f2 162void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
b32b8144
FG
163 bufferlist& output)
164{
165 Request request;
11fdf7f2 166 decode(request, input);
b32b8144
FG
167 auto count = std::min<uint16_t>(request.max_buckets, 128);
168
169 Response response;
170 server->get_bucket_counters(count, response.bucket_counters);
11fdf7f2 171 encode(response, output);
b32b8144
FG
172}
173
174/// api to notify peer gateways that trim has completed and their bucket change
175/// counters can be reset
176struct TrimComplete {
177 struct Request {
178 void encode(bufferlist& bl) const;
11fdf7f2 179 void decode(bufferlist::const_iterator& p);
b32b8144
FG
180 };
181 struct Response {
182 void encode(bufferlist& bl) const;
11fdf7f2 183 void decode(bufferlist::const_iterator& p);
b32b8144
FG
184 };
185
186 /// server interface to reset bucket counters
187 using Server = TrimCounters::Server;
188
189 /// notify handler
190 class Handler : public TrimNotifyHandler {
191 Server *const server;
192 public:
11fdf7f2 193 explicit Handler(Server *server) : server(server) {}
b32b8144 194
11fdf7f2 195 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
b32b8144
FG
196 };
197};
198
199void TrimComplete::Request::encode(bufferlist& bl) const
200{
201 ENCODE_START(1, 1, bl);
202 ENCODE_FINISH(bl);
203}
11fdf7f2 204void TrimComplete::Request::decode(bufferlist::const_iterator& p)
b32b8144
FG
205{
206 DECODE_START(1, p);
207 DECODE_FINISH(p);
208}
209WRITE_CLASS_ENCODER(TrimComplete::Request);
210
211void TrimComplete::Response::encode(bufferlist& bl) const
212{
213 ENCODE_START(1, 1, bl);
214 ENCODE_FINISH(bl);
215}
11fdf7f2 216void TrimComplete::Response::decode(bufferlist::const_iterator& p)
b32b8144
FG
217{
218 DECODE_START(1, p);
219 DECODE_FINISH(p);
220}
221WRITE_CLASS_ENCODER(TrimComplete::Response);
222
11fdf7f2 223void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
b32b8144
FG
224 bufferlist& output)
225{
226 Request request;
11fdf7f2 227 decode(request, input);
b32b8144
FG
228
229 server->reset_bucket_counters();
230
231 Response response;
11fdf7f2 232 encode(response, output);
b32b8144
FG
233}
234
235
236/// rados watcher for bucket trim notifications
237class BucketTrimWatcher : public librados::WatchCtx2 {
238 RGWRados *const store;
239 const rgw_raw_obj& obj;
240 rgw_rados_ref ref;
241 uint64_t handle{0};
242
243 using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
244 boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
245
246 public:
247 BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
248 TrimCounters::Server *counters)
249 : store(store), obj(obj) {
250 handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
251 handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
252 }
253
254 ~BucketTrimWatcher() {
255 stop();
256 }
257
258 int start() {
259 int r = store->get_raw_obj_ref(obj, &ref);
260 if (r < 0) {
261 return r;
262 }
263
264 // register a watch on the realm's control object
11fdf7f2 265 r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
b32b8144
FG
266 if (r == -ENOENT) {
267 constexpr bool exclusive = true;
11fdf7f2 268 r = ref.ioctx.create(ref.obj.oid, exclusive);
b32b8144 269 if (r == -EEXIST || r == 0) {
11fdf7f2 270 r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
b32b8144
FG
271 }
272 }
273 if (r < 0) {
11fdf7f2 274 lderr(store->ctx()) << "Failed to watch " << ref.obj
b32b8144
FG
275 << " with " << cpp_strerror(-r) << dendl;
276 ref.ioctx.close();
277 return r;
278 }
279
11fdf7f2 280 ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
b32b8144
FG
281 return 0;
282 }
283
284 int restart() {
285 int r = ref.ioctx.unwatch2(handle);
286 if (r < 0) {
11fdf7f2 287 lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
b32b8144
FG
288 << " with " << cpp_strerror(-r) << dendl;
289 }
11fdf7f2 290 r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
b32b8144 291 if (r < 0) {
11fdf7f2 292 lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
b32b8144
FG
293 << " with " << cpp_strerror(-r) << dendl;
294 ref.ioctx.close();
295 }
296 return r;
297 }
298
299 void stop() {
300 if (handle) {
301 ref.ioctx.unwatch2(handle);
302 ref.ioctx.close();
303 }
304 }
305
306 /// respond to bucket trim notifications
307 void handle_notify(uint64_t notify_id, uint64_t cookie,
308 uint64_t notifier_id, bufferlist& bl) override {
309 if (cookie != handle) {
310 return;
311 }
312 bufferlist reply;
313 try {
11fdf7f2 314 auto p = bl.cbegin();
b32b8144 315 TrimNotifyType type;
11fdf7f2 316 decode(type, p);
b32b8144
FG
317
318 auto handler = handlers.find(type);
319 if (handler != handlers.end()) {
320 handler->second->handle(p, reply);
321 } else {
322 lderr(store->ctx()) << "no handler for notify type " << type << dendl;
323 }
324 } catch (const buffer::error& e) {
325 lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
326 }
11fdf7f2 327 ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply);
b32b8144
FG
328 }
329
330 /// reestablish the watch if it gets disconnected
331 void handle_error(uint64_t cookie, int err) override {
332 if (cookie != handle) {
333 return;
334 }
335 if (err == -ENOTCONN) {
11fdf7f2 336 ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
b32b8144
FG
337 restart();
338 }
339 }
340};
341
342
343/// Interface to communicate with the trim manager about completed operations
344struct BucketTrimObserver {
345 virtual ~BucketTrimObserver() = default;
346
347 virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
348 virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
349};
350
351/// populate the status with the minimum stable marker of each shard
352template <typename Iter>
353int take_min_status(CephContext *cct, Iter first, Iter last,
354 std::vector<std::string> *status)
355{
b32b8144 356 for (auto peer = first; peer != last; ++peer) {
eafe8130 357 if (peer->size() != status->size()) {
b32b8144
FG
358 // all peers must agree on the number of shards
359 return -EINVAL;
360 }
361 auto m = status->begin();
362 for (auto& shard : *peer) {
363 auto& marker = *m++;
364 // only consider incremental sync markers
365 if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
366 continue;
367 }
368 // always take the first marker, or any later marker that's smaller
369 if (peer == first || marker > shard.inc_marker.position) {
370 marker = std::move(shard.inc_marker.position);
371 }
372 }
373 }
374 return 0;
375}
376
377/// trim each bilog shard to the given marker, while limiting the number of
378/// concurrent requests
379class BucketTrimShardCollectCR : public RGWShardCollectCR {
380 static constexpr int MAX_CONCURRENT_SHARDS = 16;
381 RGWRados *const store;
382 const RGWBucketInfo& bucket_info;
383 const std::vector<std::string>& markers; //< shard markers to trim
384 size_t i{0}; //< index of current shard marker
385 public:
386 BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
387 const std::vector<std::string>& markers)
388 : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
389 store(store), bucket_info(bucket_info), markers(markers)
390 {}
391 bool spawn_next() override;
392};
393
394bool BucketTrimShardCollectCR::spawn_next()
395{
396 while (i < markers.size()) {
397 const auto& marker = markers[i];
398 const auto shard_id = i++;
399
400 // skip empty markers
401 if (!marker.empty()) {
402 ldout(cct, 10) << "trimming bilog shard " << shard_id
403 << " of " << bucket_info.bucket << " at marker " << marker << dendl;
404 spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
405 std::string{}, marker),
406 false);
407 return true;
408 }
409 }
410 return false;
411}
412
413/// trim the bilog of all of the given bucket instance's shards
414class BucketTrimInstanceCR : public RGWCoroutine {
415 RGWRados *const store;
416 RGWHTTPManager *const http;
417 BucketTrimObserver *const observer;
418 std::string bucket_instance;
419 const std::string& zone_id; //< my zone id
420 RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
11fdf7f2 421 int child_ret = 0;
b32b8144
FG
422
423 using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
424 std::vector<StatusShards> peer_status; //< sync status for each peer
425 std::vector<std::string> min_markers; //< min marker per shard
426
427 public:
428 BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
429 BucketTrimObserver *observer,
430 const std::string& bucket_instance)
431 : RGWCoroutine(store->ctx()), store(store),
432 http(http), observer(observer),
433 bucket_instance(bucket_instance),
11fdf7f2 434 zone_id(store->svc.zone->get_zone().id),
81eedcae 435 peer_status(store->svc.zone->get_zone_data_notify_to_map().size())
b32b8144
FG
436 {}
437
438 int operate() override;
439};
440
441int BucketTrimInstanceCR::operate()
442{
443 reenter(this) {
444 ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
445
446 // query peers for sync status
447 set_status("fetching sync status from peers");
448 yield {
449 // query data sync status from each sync peer
450 rgw_http_param_pair params[] = {
451 { "type", "bucket-index" },
452 { "status", nullptr },
453 { "bucket", bucket_instance.c_str() },
454 { "source-zone", zone_id.c_str() },
455 { nullptr, nullptr }
456 };
457
458 auto p = peer_status.begin();
81eedcae 459 for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
b32b8144
FG
460 using StatusCR = RGWReadRESTResourceCR<StatusShards>;
461 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
462 false);
463 ++p;
464 }
465 // in parallel, read the local bucket instance info
466 spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
467 bucket_instance, &bucket_info),
468 false);
469 }
470 // wait for a response from each peer. all must respond to attempt trim
471 while (num_spawned()) {
b32b8144
FG
472 yield wait_for_child();
473 collect(&child_ret, nullptr);
474 if (child_ret < 0) {
475 drain_all();
476 return set_cr_error(child_ret);
477 }
478 }
479
eafe8130
TL
480 // initialize each shard with the maximum marker, which is only used when
481 // there are no peers syncing from us
482 min_markers.assign(std::max(1u, bucket_info.num_shards),
483 RGWSyncLogTrimCR::max_marker);
484
b32b8144
FG
485 // determine the minimum marker for each shard
486 retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
487 &min_markers);
488 if (retcode < 0) {
489 ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
490 return set_cr_error(retcode);
491 }
492
493 // trim shards with a ShardCollectCR
494 ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
495 << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
496 set_status("trimming bilog shards");
497 yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
498 // ENODATA just means there were no keys to trim
499 if (retcode == -ENODATA) {
500 retcode = 0;
501 }
502 if (retcode < 0) {
503 ldout(cct, 4) << "failed to trim bilog shards: "
504 << cpp_strerror(retcode) << dendl;
505 return set_cr_error(retcode);
506 }
507
508 observer->on_bucket_trimmed(std::move(bucket_instance));
509 return set_cr_done();
510 }
511 return 0;
512}
513
514/// trim each bucket instance while limiting the number of concurrent operations
515class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
516 RGWRados *const store;
517 RGWHTTPManager *const http;
518 BucketTrimObserver *const observer;
519 std::vector<std::string>::const_iterator bucket;
520 std::vector<std::string>::const_iterator end;
521 public:
522 BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http,
523 BucketTrimObserver *observer,
524 const std::vector<std::string>& buckets,
525 int max_concurrent)
526 : RGWShardCollectCR(store->ctx(), max_concurrent),
527 store(store), http(http), observer(observer),
528 bucket(buckets.begin()), end(buckets.end())
529 {}
530 bool spawn_next() override;
531};
532
533bool BucketTrimInstanceCollectCR::spawn_next()
534{
535 if (bucket == end) {
536 return false;
537 }
538 spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
539 ++bucket;
540 return true;
541}
542
543/// correlate the replies from each peer gateway into the given counter
544int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
545{
546 counter.clear();
547
548 try {
549 // decode notify responses
11fdf7f2 550 auto p = bl.cbegin();
b32b8144
FG
551 std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
552 std::set<std::pair<uint64_t, uint64_t>> timeouts;
11fdf7f2
TL
553 decode(replies, p);
554 decode(timeouts, p);
b32b8144
FG
555
556 for (auto& peer : replies) {
11fdf7f2 557 auto q = peer.second.cbegin();
b32b8144 558 TrimCounters::Response response;
11fdf7f2 559 decode(response, q);
b32b8144
FG
560 for (const auto& b : response.bucket_counters) {
561 counter.insert(b.bucket, b.count);
562 }
563 }
564 } catch (const buffer::error& e) {
565 return -EIO;
566 }
567 return 0;
568}
569
570/// metadata callback has the signature bool(string&& key, string&& marker)
571using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
572
573/// lists metadata keys, passing each to a callback until it returns false.
574/// on reaching the end, it will restart at the beginning and list up to the
575/// initial marker
576class AsyncMetadataList : public RGWAsyncRadosRequest {
577 CephContext *const cct;
578 RGWMetadataManager *const mgr;
579 const std::string section;
580 const std::string start_marker;
581 MetadataListCallback callback;
b32b8144
FG
582
583 int _send_request() override;
584 public:
585 AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
586 RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
587 const std::string& section, const std::string& start_marker,
588 const MetadataListCallback& callback)
589 : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
590 section(section), start_marker(start_marker), callback(callback)
591 {}
b32b8144
FG
592};
593
594int AsyncMetadataList::_send_request()
595{
f64942e4
AA
596 void* handle = nullptr;
597 std::list<std::string> keys;
598 bool truncated{false};
599 std::string marker;
600
b32b8144
FG
601 // start a listing at the given marker
602 int r = mgr->list_keys_init(section, start_marker, &handle);
f64942e4
AA
603 if (r == -EINVAL) {
604 // restart with empty marker below
605 } else if (r < 0) {
b32b8144
FG
606 ldout(cct, 10) << "failed to init metadata listing: "
607 << cpp_strerror(r) << dendl;
608 return r;
f64942e4
AA
609 } else {
610 ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
611
612 // release the handle when scope exits
613 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
614
615 do {
616 // get the next key and marker
617 r = mgr->list_keys_next(handle, 1, keys, &truncated);
618 if (r < 0) {
619 ldout(cct, 10) << "failed to list metadata: "
620 << cpp_strerror(r) << dendl;
621 return r;
b32b8144 622 }
f64942e4
AA
623 marker = mgr->get_marker(handle);
624
625 if (!keys.empty()) {
626 ceph_assert(keys.size() == 1);
627 auto& key = keys.front();
628 if (!callback(std::move(key), std::move(marker))) {
629 return 0;
630 }
631 }
632 } while (truncated);
b32b8144 633
f64942e4
AA
634 if (start_marker.empty()) {
635 // already listed all keys
636 return 0;
637 }
b32b8144
FG
638 }
639
640 // restart the listing from the beginning (empty marker)
b32b8144
FG
641 handle = nullptr;
642
643 r = mgr->list_keys_init(section, "", &handle);
644 if (r < 0) {
645 ldout(cct, 10) << "failed to restart metadata listing: "
646 << cpp_strerror(r) << dendl;
647 return r;
648 }
649 ldout(cct, 20) << "restarting metadata listing" << dendl;
650
f64942e4
AA
651 // release the handle when scope exits
652 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
b32b8144
FG
653 do {
654 // get the next key and marker
655 r = mgr->list_keys_next(handle, 1, keys, &truncated);
656 if (r < 0) {
657 ldout(cct, 10) << "failed to list metadata: "
658 << cpp_strerror(r) << dendl;
659 return r;
660 }
661 marker = mgr->get_marker(handle);
662
663 if (!keys.empty()) {
11fdf7f2 664 ceph_assert(keys.size() == 1);
b32b8144
FG
665 auto& key = keys.front();
666 // stop at original marker
eafe8130 667 if (marker > start_marker) {
b32b8144
FG
668 return 0;
669 }
670 if (!callback(std::move(key), std::move(marker))) {
671 return 0;
672 }
673 }
674 } while (truncated);
675
676 return 0;
677}
678
679/// coroutine wrapper for AsyncMetadataList
680class MetadataListCR : public RGWSimpleCoroutine {
681 RGWAsyncRadosProcessor *const async_rados;
682 RGWMetadataManager *const mgr;
683 const std::string& section;
684 const std::string& start_marker;
685 MetadataListCallback callback;
686 RGWAsyncRadosRequest *req{nullptr};
687 public:
688 MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
689 RGWMetadataManager *mgr, const std::string& section,
690 const std::string& start_marker,
691 const MetadataListCallback& callback)
692 : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
693 section(section), start_marker(start_marker), callback(callback)
694 {}
695 ~MetadataListCR() override {
696 request_cleanup();
697 }
698
699 int send_request() override {
700 req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
701 mgr, section, start_marker, callback);
702 async_rados->queue(req);
703 return 0;
704 }
705 int request_complete() override {
706 return req->get_ret_status();
707 }
708 void request_cleanup() override {
709 if (req) {
710 req->finish();
711 req = nullptr;
712 }
713 }
714};
715
716class BucketTrimCR : public RGWCoroutine {
717 RGWRados *const store;
718 RGWHTTPManager *const http;
719 const BucketTrimConfig& config;
720 BucketTrimObserver *const observer;
721 const rgw_raw_obj& obj;
722 ceph::mono_time start_time;
723 bufferlist notify_replies;
724 BucketChangeCounter counter;
725 std::vector<std::string> buckets; //< buckets selected for trim
726 BucketTrimStatus status;
727 RGWObjVersionTracker objv; //< version tracker for trim status object
728 std::string last_cold_marker; //< position for next trim marker
729
730 static const std::string section; //< metadata section for bucket instances
731 public:
732 BucketTrimCR(RGWRados *store, RGWHTTPManager *http,
733 const BucketTrimConfig& config, BucketTrimObserver *observer,
734 const rgw_raw_obj& obj)
735 : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
736 observer(observer), obj(obj), counter(config.counter_size)
737 {}
738
11fdf7f2 739 int operate() override;
b32b8144
FG
740};
741
742const std::string BucketTrimCR::section{"bucket.instance"};
743
744int BucketTrimCR::operate()
745{
746 reenter(this) {
747 start_time = ceph::mono_clock::now();
748
749 if (config.buckets_per_interval) {
750 // query watch/notify for hot buckets
751 ldout(cct, 10) << "fetching active bucket counters" << dendl;
752 set_status("fetching active bucket counters");
753 yield {
754 // request the top bucket counters from each peer gateway
755 const TrimNotifyType type = NotifyTrimCounters;
756 TrimCounters::Request request{32};
757 bufferlist bl;
11fdf7f2
TL
758 encode(type, bl);
759 encode(request, bl);
b32b8144
FG
760 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
761 &notify_replies));
762 }
763 if (retcode < 0) {
764 ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
765 return set_cr_error(retcode);
766 }
767
768 // select the hottest buckets for trim
769 retcode = accumulate_peer_counters(notify_replies, counter);
770 if (retcode < 0) {
771 ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
772 return set_cr_error(retcode);
773 }
774 buckets.reserve(config.buckets_per_interval);
775
776 const int max_count = config.buckets_per_interval -
777 config.min_cold_buckets_per_interval;
778 counter.get_highest(max_count,
779 [this] (const std::string& bucket, int count) {
780 buckets.push_back(bucket);
781 });
782 }
783
784 if (buckets.size() < config.buckets_per_interval) {
785 // read BucketTrimStatus for marker position
786 set_status("reading trim status");
787 using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
11fdf7f2 788 yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj,
b32b8144
FG
789 &status, true, &objv));
790 if (retcode < 0) {
791 ldout(cct, 10) << "failed to read bilog trim status: "
792 << cpp_strerror(retcode) << dendl;
793 return set_cr_error(retcode);
794 }
795 if (status.marker == "MAX") {
796 status.marker.clear(); // restart at the beginning
797 }
798 ldout(cct, 10) << "listing cold buckets from marker="
799 << status.marker << dendl;
800
801 set_status("listing cold buckets for trim");
802 yield {
803 // capture a reference so 'this' remains valid in the callback
804 auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
805 // list cold buckets to consider for trim
806 auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
807 // filter out keys that we trimmed recently
808 if (observer->trimmed_recently(bucket)) {
809 return true;
810 }
811 // filter out active buckets that we've already selected
812 auto i = std::find(buckets.begin(), buckets.end(), bucket);
813 if (i != buckets.end()) {
814 return true;
815 }
816 buckets.emplace_back(std::move(bucket));
817 // remember the last cold bucket spawned to update the status marker
818 last_cold_marker = std::move(marker);
819 // return true if there's room for more
820 return buckets.size() < config.buckets_per_interval;
821 };
822
823 call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
824 section, status.marker, cb));
825 }
826 if (retcode < 0) {
827 ldout(cct, 4) << "failed to list bucket instance metadata: "
828 << cpp_strerror(retcode) << dendl;
829 return set_cr_error(retcode);
830 }
831 }
832
833 // trim bucket instances with limited concurrency
834 set_status("trimming buckets");
835 ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
836 yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
837 config.concurrent_buckets));
838 // ignore errors from individual buckets
839
840 // write updated trim status
841 if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
842 set_status("writing updated trim status");
843 status.marker = std::move(last_cold_marker);
844 ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
845 using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
11fdf7f2 846 yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj,
b32b8144
FG
847 status, &objv));
848 if (retcode < 0) {
849 ldout(cct, 4) << "failed to write updated trim status: "
850 << cpp_strerror(retcode) << dendl;
851 return set_cr_error(retcode);
852 }
853 }
854
855 // notify peers that trim completed
856 set_status("trim completed");
857 yield {
858 const TrimNotifyType type = NotifyTrimComplete;
859 TrimComplete::Request request;
860 bufferlist bl;
11fdf7f2
TL
861 encode(type, bl);
862 encode(request, bl);
b32b8144
FG
863 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
864 nullptr));
865 }
866 if (retcode < 0) {
867 ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
868 return set_cr_error(retcode);
869 }
870
871 ldout(cct, 4) << "bucket index log processing completed in "
872 << ceph::mono_clock::now() - start_time << dendl;
873 return set_cr_done();
874 }
875 return 0;
876}
877
878class BucketTrimPollCR : public RGWCoroutine {
879 RGWRados *const store;
880 RGWHTTPManager *const http;
881 const BucketTrimConfig& config;
882 BucketTrimObserver *const observer;
883 const rgw_raw_obj& obj;
884 const std::string name{"trim"}; //< lock name
885 const std::string cookie;
886
887 public:
888 BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http,
889 const BucketTrimConfig& config,
890 BucketTrimObserver *observer, const rgw_raw_obj& obj)
891 : RGWCoroutine(store->ctx()), store(store), http(http),
892 config(config), observer(observer), obj(obj),
893 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
894 {}
895
11fdf7f2 896 int operate() override;
b32b8144
FG
897};
898
899int BucketTrimPollCR::operate()
900{
901 reenter(this) {
902 for (;;) {
903 set_status("sleeping");
11fdf7f2 904 wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
b32b8144
FG
905
906 // prevent others from trimming for our entire wait interval
907 set_status("acquiring trim lock");
908 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
909 obj, name, cookie,
910 config.trim_interval_sec));
911 if (retcode < 0) {
912 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
913 continue;
914 }
915
916 set_status("trimming");
917 yield call(new BucketTrimCR(store, http, config, observer, obj));
918 if (retcode < 0) {
919 // on errors, unlock so other gateways can try
920 set_status("unlocking");
921 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
922 obj, name, cookie));
923 }
924 }
925 }
926 return 0;
927}
928
929/// tracks a bounded list of events with timestamps. old events can be expired,
930/// and recent events can be searched by key. expiration depends on events being
931/// inserted in temporal order
932template <typename T, typename Clock = ceph::coarse_mono_clock>
933class RecentEventList {
934 public:
935 using clock_type = Clock;
936 using time_point = typename clock_type::time_point;
937
938 RecentEventList(size_t max_size, const ceph::timespan& max_duration)
939 : events(max_size), max_duration(max_duration)
940 {}
941
942 /// insert an event at the given point in time. this time must be at least as
943 /// recent as the last inserted event
944 void insert(T&& value, const time_point& now) {
11fdf7f2 945 // ceph_assert(events.empty() || now >= events.back().time)
b32b8144
FG
946 events.push_back(Event{std::move(value), now});
947 }
948
949 /// performs a linear search for an event matching the given key, whose type
950 /// U can be any that provides operator==(U, T)
951 template <typename U>
952 bool lookup(const U& key) const {
953 for (const auto& event : events) {
954 if (key == event.value) {
955 return true;
956 }
957 }
958 return false;
959 }
960
961 /// remove events that are no longer recent compared to the given point in time
962 void expire_old(const time_point& now) {
963 const auto expired_before = now - max_duration;
964 while (!events.empty() && events.front().time < expired_before) {
965 events.pop_front();
966 }
967 }
968
969 private:
970 struct Event {
971 T value;
972 time_point time;
973 };
974 boost::circular_buffer<Event> events;
975 const ceph::timespan max_duration;
976};
977
978namespace rgw {
979
980// read bucket trim configuration from ceph context
981void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
982{
11fdf7f2 983 const auto& conf = cct->_conf;
b32b8144
FG
984
985 config.trim_interval_sec =
11fdf7f2 986 conf.get_val<int64_t>("rgw_sync_log_trim_interval");
b32b8144
FG
987 config.counter_size = 512;
988 config.buckets_per_interval =
11fdf7f2 989 conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
b32b8144 990 config.min_cold_buckets_per_interval =
11fdf7f2 991 conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
b32b8144 992 config.concurrent_buckets =
11fdf7f2 993 conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
b32b8144
FG
994 config.notify_timeout_ms = 10000;
995 config.recent_size = 128;
996 config.recent_duration = std::chrono::hours(2);
997}
998
999class BucketTrimManager::Impl : public TrimCounters::Server,
1000 public BucketTrimObserver {
1001 public:
1002 RGWRados *const store;
1003 const BucketTrimConfig config;
1004
1005 const rgw_raw_obj status_obj;
1006
1007 /// count frequency of bucket instance entries in the data changes log
1008 BucketChangeCounter counter;
1009
1010 using RecentlyTrimmedBucketList = RecentEventList<std::string>;
1011 using clock_type = RecentlyTrimmedBucketList::clock_type;
1012 /// track recently trimmed buckets to focus trim activity elsewhere
1013 RecentlyTrimmedBucketList trimmed;
1014
1015 /// serve the bucket trim watch/notify api
1016 BucketTrimWatcher watcher;
1017
1018 /// protect data shared between data sync, trim, and watch/notify threads
1019 std::mutex mutex;
1020
1021 Impl(RGWRados *store, const BucketTrimConfig& config)
1022 : store(store), config(config),
11fdf7f2 1023 status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid),
b32b8144
FG
1024 counter(config.counter_size),
1025 trimmed(config.recent_size, config.recent_duration),
1026 watcher(store, status_obj, this)
1027 {}
1028
1029 /// TrimCounters::Server interface for watch/notify api
1030 void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
1031 buckets.reserve(count);
1032 std::lock_guard<std::mutex> lock(mutex);
1033 counter.get_highest(count, [&buckets] (const std::string& key, int count) {
1034 buckets.emplace_back(key, count);
1035 });
1036 ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
1037 }
1038
1039 void reset_bucket_counters() override {
1040 ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
1041 std::lock_guard<std::mutex> lock(mutex);
1042 counter.clear();
1043 trimmed.expire_old(clock_type::now());
1044 }
1045
1046 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1047 void on_bucket_trimmed(std::string&& bucket_instance) override {
1048 ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
1049 std::lock_guard<std::mutex> lock(mutex);
1050 trimmed.insert(std::move(bucket_instance), clock_type::now());
1051 }
1052
1053 bool trimmed_recently(const boost::string_view& bucket_instance) override {
1054 std::lock_guard<std::mutex> lock(mutex);
1055 return trimmed.lookup(bucket_instance);
1056 }
1057};
1058
1059BucketTrimManager::BucketTrimManager(RGWRados *store,
1060 const BucketTrimConfig& config)
1061 : impl(new Impl(store, config))
1062{
1063}
1064BucketTrimManager::~BucketTrimManager() = default;
1065
1066int BucketTrimManager::init()
1067{
1068 return impl->watcher.start();
1069}
1070
1071void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
1072{
1073 std::lock_guard<std::mutex> lock(impl->mutex);
1074 // filter recently trimmed bucket instances out of bucket change counter
1075 if (impl->trimmed.lookup(bucket)) {
1076 return;
1077 }
1078 impl->counter.insert(bucket.to_string());
1079}
1080
1081RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
1082{
1083 return new BucketTrimPollCR(impl->store, http, impl->config,
1084 impl.get(), impl->status_obj);
1085}
1086
1087RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
1088{
1089 // return the trim coroutine without any polling
1090 return new BucketTrimCR(impl->store, http, impl->config,
1091 impl.get(), impl->status_obj);
1092}
1093
1094} // namespace rgw