]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_log_trim.cc
update source to 12.2.11
[ceph.git] / ceph / src / rgw / rgw_sync_log_trim.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * Author: Casey Bodley <cbodley@redhat.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 */
15
16 #include <mutex>
17 #include <boost/circular_buffer.hpp>
18 #include <boost/container/flat_map.hpp>
19
20 #include "include/scope_guard.h"
21 #include "common/bounded_key_counter.h"
22 #include "common/errno.h"
23 #include "rgw_sync_log_trim.h"
24 #include "rgw_cr_rados.h"
25 #include "rgw_cr_rest.h"
26 #include "rgw_data_sync.h"
27 #include "rgw_metadata.h"
28 #include "rgw_rados.h"
29 #include "rgw_sync.h"
30
31 #include <boost/asio/yield.hpp>
32 #include "include/assert.h"
33
34 #define dout_subsys ceph_subsys_rgw
35
36 #undef dout_prefix
37 #define dout_prefix (*_dout << "trim: ")
38
39 using rgw::BucketTrimConfig;
40 using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
41
42 const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
43 using rgw::BucketTrimStatus;
44
45
46 // watch/notify api for gateways to coordinate about which buckets to trim
47 enum TrimNotifyType {
48 NotifyTrimCounters = 0,
49 NotifyTrimComplete,
50 };
51 WRITE_RAW_ENCODER(TrimNotifyType);
52
53 struct TrimNotifyHandler {
54 virtual ~TrimNotifyHandler() = default;
55
56 virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
57 };
58
59 /// api to share the bucket trim counters between gateways in the same zone.
60 /// each gateway will process different datalog shards, so the gateway that runs
61 /// the trim process needs to accumulate their counters
62 struct TrimCounters {
63 /// counter for a single bucket
64 struct BucketCounter {
65 std::string bucket; //< bucket instance metadata key
66 int count{0};
67
68 BucketCounter() = default;
69 BucketCounter(const std::string& bucket, int count)
70 : bucket(bucket), count(count) {}
71
72 void encode(bufferlist& bl) const;
73 void decode(bufferlist::iterator& p);
74 };
75 using Vector = std::vector<BucketCounter>;
76
77 /// request bucket trim counters from peer gateways
78 struct Request {
79 uint16_t max_buckets; //< maximum number of bucket counters to return
80
81 void encode(bufferlist& bl) const;
82 void decode(bufferlist::iterator& p);
83 };
84
85 /// return the current bucket trim counters
86 struct Response {
87 Vector bucket_counters;
88
89 void encode(bufferlist& bl) const;
90 void decode(bufferlist::iterator& p);
91 };
92
93 /// server interface to query the hottest buckets
94 struct Server {
95 virtual ~Server() = default;
96
97 virtual void get_bucket_counters(int count, Vector& counters) = 0;
98 virtual void reset_bucket_counters() = 0;
99 };
100
101 /// notify handler
102 class Handler : public TrimNotifyHandler {
103 Server *const server;
104 public:
105 Handler(Server *server) : server(server) {}
106
107 void handle(bufferlist::iterator& input, bufferlist& output) override;
108 };
109 };
110 std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
111 {
112 return out << rhs.bucket << ":" << rhs.count;
113 }
114
115 void TrimCounters::BucketCounter::encode(bufferlist& bl) const
116 {
117 // no versioning to save space
118 ::encode(bucket, bl);
119 ::encode(count, bl);
120 }
121 void TrimCounters::BucketCounter::decode(bufferlist::iterator& p)
122 {
123 ::decode(bucket, p);
124 ::decode(count, p);
125 }
126 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
127
128 void TrimCounters::Request::encode(bufferlist& bl) const
129 {
130 ENCODE_START(1, 1, bl);
131 ::encode(max_buckets, bl);
132 ENCODE_FINISH(bl);
133 }
134 void TrimCounters::Request::decode(bufferlist::iterator& p)
135 {
136 DECODE_START(1, p);
137 ::decode(max_buckets, p);
138 DECODE_FINISH(p);
139 }
140 WRITE_CLASS_ENCODER(TrimCounters::Request);
141
142 void TrimCounters::Response::encode(bufferlist& bl) const
143 {
144 ENCODE_START(1, 1, bl);
145 ::encode(bucket_counters, bl);
146 ENCODE_FINISH(bl);
147 }
148 void TrimCounters::Response::decode(bufferlist::iterator& p)
149 {
150 DECODE_START(1, p);
151 ::decode(bucket_counters, p);
152 DECODE_FINISH(p);
153 }
154 WRITE_CLASS_ENCODER(TrimCounters::Response);
155
156 void TrimCounters::Handler::handle(bufferlist::iterator& input,
157 bufferlist& output)
158 {
159 Request request;
160 ::decode(request, input);
161 auto count = std::min<uint16_t>(request.max_buckets, 128);
162
163 Response response;
164 server->get_bucket_counters(count, response.bucket_counters);
165 ::encode(response, output);
166 }
167
168 /// api to notify peer gateways that trim has completed and their bucket change
169 /// counters can be reset
170 struct TrimComplete {
171 struct Request {
172 void encode(bufferlist& bl) const;
173 void decode(bufferlist::iterator& p);
174 };
175 struct Response {
176 void encode(bufferlist& bl) const;
177 void decode(bufferlist::iterator& p);
178 };
179
180 /// server interface to reset bucket counters
181 using Server = TrimCounters::Server;
182
183 /// notify handler
184 class Handler : public TrimNotifyHandler {
185 Server *const server;
186 public:
187 Handler(Server *server) : server(server) {}
188
189 void handle(bufferlist::iterator& input, bufferlist& output) override;
190 };
191 };
192
193 void TrimComplete::Request::encode(bufferlist& bl) const
194 {
195 ENCODE_START(1, 1, bl);
196 ENCODE_FINISH(bl);
197 }
198 void TrimComplete::Request::decode(bufferlist::iterator& p)
199 {
200 DECODE_START(1, p);
201 DECODE_FINISH(p);
202 }
203 WRITE_CLASS_ENCODER(TrimComplete::Request);
204
205 void TrimComplete::Response::encode(bufferlist& bl) const
206 {
207 ENCODE_START(1, 1, bl);
208 ENCODE_FINISH(bl);
209 }
210 void TrimComplete::Response::decode(bufferlist::iterator& p)
211 {
212 DECODE_START(1, p);
213 DECODE_FINISH(p);
214 }
215 WRITE_CLASS_ENCODER(TrimComplete::Response);
216
217 void TrimComplete::Handler::handle(bufferlist::iterator& input,
218 bufferlist& output)
219 {
220 Request request;
221 ::decode(request, input);
222
223 server->reset_bucket_counters();
224
225 Response response;
226 ::encode(response, output);
227 }
228
229
230 /// rados watcher for bucket trim notifications
231 class BucketTrimWatcher : public librados::WatchCtx2 {
232 RGWRados *const store;
233 const rgw_raw_obj& obj;
234 rgw_rados_ref ref;
235 uint64_t handle{0};
236
237 using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
238 boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
239
240 public:
241 BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
242 TrimCounters::Server *counters)
243 : store(store), obj(obj) {
244 handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
245 handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
246 }
247
248 ~BucketTrimWatcher() {
249 stop();
250 }
251
252 int start() {
253 int r = store->get_raw_obj_ref(obj, &ref);
254 if (r < 0) {
255 return r;
256 }
257
258 // register a watch on the realm's control object
259 r = ref.ioctx.watch2(ref.oid, &handle, this);
260 if (r == -ENOENT) {
261 constexpr bool exclusive = true;
262 r = ref.ioctx.create(ref.oid, exclusive);
263 if (r == -EEXIST || r == 0) {
264 r = ref.ioctx.watch2(ref.oid, &handle, this);
265 }
266 }
267 if (r < 0) {
268 lderr(store->ctx()) << "Failed to watch " << ref.oid
269 << " with " << cpp_strerror(-r) << dendl;
270 ref.ioctx.close();
271 return r;
272 }
273
274 ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl;
275 return 0;
276 }
277
278 int restart() {
279 int r = ref.ioctx.unwatch2(handle);
280 if (r < 0) {
281 lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
282 << " with " << cpp_strerror(-r) << dendl;
283 }
284 r = ref.ioctx.watch2(ref.oid, &handle, this);
285 if (r < 0) {
286 lderr(store->ctx()) << "Failed to restart watch on " << ref.oid
287 << " with " << cpp_strerror(-r) << dendl;
288 ref.ioctx.close();
289 }
290 return r;
291 }
292
293 void stop() {
294 if (handle) {
295 ref.ioctx.unwatch2(handle);
296 ref.ioctx.close();
297 }
298 }
299
300 /// respond to bucket trim notifications
301 void handle_notify(uint64_t notify_id, uint64_t cookie,
302 uint64_t notifier_id, bufferlist& bl) override {
303 if (cookie != handle) {
304 return;
305 }
306 bufferlist reply;
307 try {
308 auto p = bl.begin();
309 TrimNotifyType type;
310 ::decode(type, p);
311
312 auto handler = handlers.find(type);
313 if (handler != handlers.end()) {
314 handler->second->handle(p, reply);
315 } else {
316 lderr(store->ctx()) << "no handler for notify type " << type << dendl;
317 }
318 } catch (const buffer::error& e) {
319 lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
320 }
321 ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply);
322 }
323
324 /// reestablish the watch if it gets disconnected
325 void handle_error(uint64_t cookie, int err) override {
326 if (cookie != handle) {
327 return;
328 }
329 if (err == -ENOTCONN) {
330 ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl;
331 restart();
332 }
333 }
334 };
335
336
337 /// Interface to communicate with the trim manager about completed operations
338 struct BucketTrimObserver {
339 virtual ~BucketTrimObserver() = default;
340
341 virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
342 virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
343 };
344
345 /// populate the status with the minimum stable marker of each shard
346 template <typename Iter>
347 int take_min_status(CephContext *cct, Iter first, Iter last,
348 std::vector<std::string> *status)
349 {
350 status->clear();
351 boost::optional<size_t> num_shards;
352 for (auto peer = first; peer != last; ++peer) {
353 const size_t peer_shards = peer->size();
354 if (!num_shards) {
355 num_shards = peer_shards;
356 status->resize(*num_shards);
357 } else if (*num_shards != peer_shards) {
358 // all peers must agree on the number of shards
359 return -EINVAL;
360 }
361 auto m = status->begin();
362 for (auto& shard : *peer) {
363 auto& marker = *m++;
364 // only consider incremental sync markers
365 if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
366 continue;
367 }
368 // always take the first marker, or any later marker that's smaller
369 if (peer == first || marker > shard.inc_marker.position) {
370 marker = std::move(shard.inc_marker.position);
371 }
372 }
373 }
374 return 0;
375 }
376
377 /// trim each bilog shard to the given marker, while limiting the number of
378 /// concurrent requests
379 class BucketTrimShardCollectCR : public RGWShardCollectCR {
380 static constexpr int MAX_CONCURRENT_SHARDS = 16;
381 RGWRados *const store;
382 const RGWBucketInfo& bucket_info;
383 const std::vector<std::string>& markers; //< shard markers to trim
384 size_t i{0}; //< index of current shard marker
385 public:
386 BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
387 const std::vector<std::string>& markers)
388 : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
389 store(store), bucket_info(bucket_info), markers(markers)
390 {}
391 bool spawn_next() override;
392 };
393
394 bool BucketTrimShardCollectCR::spawn_next()
395 {
396 while (i < markers.size()) {
397 const auto& marker = markers[i];
398 const auto shard_id = i++;
399
400 // skip empty markers
401 if (!marker.empty()) {
402 ldout(cct, 10) << "trimming bilog shard " << shard_id
403 << " of " << bucket_info.bucket << " at marker " << marker << dendl;
404 spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
405 std::string{}, marker),
406 false);
407 return true;
408 }
409 }
410 return false;
411 }
412
413 /// trim the bilog of all of the given bucket instance's shards
414 class BucketTrimInstanceCR : public RGWCoroutine {
415 RGWRados *const store;
416 RGWHTTPManager *const http;
417 BucketTrimObserver *const observer;
418 std::string bucket_instance;
419 const std::string& zone_id; //< my zone id
420 RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
421
422 using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
423 std::vector<StatusShards> peer_status; //< sync status for each peer
424 std::vector<std::string> min_markers; //< min marker per shard
425
426 public:
427 BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
428 BucketTrimObserver *observer,
429 const std::string& bucket_instance)
430 : RGWCoroutine(store->ctx()), store(store),
431 http(http), observer(observer),
432 bucket_instance(bucket_instance),
433 zone_id(store->get_zone().id),
434 peer_status(store->zone_conn_map.size())
435 {}
436
437 int operate() override;
438 };
439
440 int BucketTrimInstanceCR::operate()
441 {
442 reenter(this) {
443 ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
444
445 // query peers for sync status
446 set_status("fetching sync status from peers");
447 yield {
448 // query data sync status from each sync peer
449 rgw_http_param_pair params[] = {
450 { "type", "bucket-index" },
451 { "status", nullptr },
452 { "bucket", bucket_instance.c_str() },
453 { "source-zone", zone_id.c_str() },
454 { nullptr, nullptr }
455 };
456
457 auto p = peer_status.begin();
458 for (auto& c : store->zone_conn_map) {
459 using StatusCR = RGWReadRESTResourceCR<StatusShards>;
460 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
461 false);
462 ++p;
463 }
464 // in parallel, read the local bucket instance info
465 spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
466 bucket_instance, &bucket_info),
467 false);
468 }
469 // wait for a response from each peer. all must respond to attempt trim
470 while (num_spawned()) {
471 int child_ret;
472 yield wait_for_child();
473 collect(&child_ret, nullptr);
474 if (child_ret < 0) {
475 drain_all();
476 return set_cr_error(child_ret);
477 }
478 }
479
480 // determine the minimum marker for each shard
481 retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
482 &min_markers);
483 if (retcode < 0) {
484 ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
485 return set_cr_error(retcode);
486 }
487
488 // trim shards with a ShardCollectCR
489 ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
490 << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
491 set_status("trimming bilog shards");
492 yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
493 // ENODATA just means there were no keys to trim
494 if (retcode == -ENODATA) {
495 retcode = 0;
496 }
497 if (retcode < 0) {
498 ldout(cct, 4) << "failed to trim bilog shards: "
499 << cpp_strerror(retcode) << dendl;
500 return set_cr_error(retcode);
501 }
502
503 observer->on_bucket_trimmed(std::move(bucket_instance));
504 return set_cr_done();
505 }
506 return 0;
507 }
508
509 /// trim each bucket instance while limiting the number of concurrent operations
510 class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
511 RGWRados *const store;
512 RGWHTTPManager *const http;
513 BucketTrimObserver *const observer;
514 std::vector<std::string>::const_iterator bucket;
515 std::vector<std::string>::const_iterator end;
516 public:
517 BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http,
518 BucketTrimObserver *observer,
519 const std::vector<std::string>& buckets,
520 int max_concurrent)
521 : RGWShardCollectCR(store->ctx(), max_concurrent),
522 store(store), http(http), observer(observer),
523 bucket(buckets.begin()), end(buckets.end())
524 {}
525 bool spawn_next() override;
526 };
527
528 bool BucketTrimInstanceCollectCR::spawn_next()
529 {
530 if (bucket == end) {
531 return false;
532 }
533 spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
534 ++bucket;
535 return true;
536 }
537
538 /// correlate the replies from each peer gateway into the given counter
539 int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
540 {
541 counter.clear();
542
543 try {
544 // decode notify responses
545 auto p = bl.begin();
546 std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
547 std::set<std::pair<uint64_t, uint64_t>> timeouts;
548 ::decode(replies, p);
549 ::decode(timeouts, p);
550
551 for (auto& peer : replies) {
552 auto q = peer.second.begin();
553 TrimCounters::Response response;
554 ::decode(response, q);
555 for (const auto& b : response.bucket_counters) {
556 counter.insert(b.bucket, b.count);
557 }
558 }
559 } catch (const buffer::error& e) {
560 return -EIO;
561 }
562 return 0;
563 }
564
565 /// metadata callback has the signature bool(string&& key, string&& marker)
566 using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
567
568 /// lists metadata keys, passing each to a callback until it returns false.
569 /// on reaching the end, it will restart at the beginning and list up to the
570 /// initial marker
571 class AsyncMetadataList : public RGWAsyncRadosRequest {
572 CephContext *const cct;
573 RGWMetadataManager *const mgr;
574 const std::string section;
575 const std::string start_marker;
576 MetadataListCallback callback;
577
578 int _send_request() override;
579 public:
580 AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
581 RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
582 const std::string& section, const std::string& start_marker,
583 const MetadataListCallback& callback)
584 : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
585 section(section), start_marker(start_marker), callback(callback)
586 {}
587 };
588
589 int AsyncMetadataList::_send_request()
590 {
591 void* handle = nullptr;
592 std::list<std::string> keys;
593 bool truncated{false};
594 std::string marker;
595
596 // start a listing at the given marker
597 int r = mgr->list_keys_init(section, start_marker, &handle);
598 if (r == -EINVAL) {
599 // restart with empty marker below
600 } else if (r < 0) {
601 ldout(cct, 10) << "failed to init metadata listing: "
602 << cpp_strerror(r) << dendl;
603 return r;
604 } else {
605 ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
606
607 // release the handle when scope exits
608 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
609
610 do {
611 // get the next key and marker
612 r = mgr->list_keys_next(handle, 1, keys, &truncated);
613 if (r < 0) {
614 ldout(cct, 10) << "failed to list metadata: "
615 << cpp_strerror(r) << dendl;
616 return r;
617 }
618 marker = mgr->get_marker(handle);
619
620 if (!keys.empty()) {
621 ceph_assert(keys.size() == 1);
622 auto& key = keys.front();
623 if (!callback(std::move(key), std::move(marker))) {
624 return 0;
625 }
626 }
627 } while (truncated);
628
629 if (start_marker.empty()) {
630 // already listed all keys
631 return 0;
632 }
633 }
634
635 // restart the listing from the beginning (empty marker)
636 handle = nullptr;
637
638 r = mgr->list_keys_init(section, "", &handle);
639 if (r < 0) {
640 ldout(cct, 10) << "failed to restart metadata listing: "
641 << cpp_strerror(r) << dendl;
642 return r;
643 }
644 ldout(cct, 20) << "restarting metadata listing" << dendl;
645
646 // release the handle when scope exits
647 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
648 do {
649 // get the next key and marker
650 r = mgr->list_keys_next(handle, 1, keys, &truncated);
651 if (r < 0) {
652 ldout(cct, 10) << "failed to list metadata: "
653 << cpp_strerror(r) << dendl;
654 return r;
655 }
656 marker = mgr->get_marker(handle);
657
658 if (!keys.empty()) {
659 assert(keys.size() == 1);
660 auto& key = keys.front();
661 // stop at original marker
662 if (marker >= start_marker) {
663 return 0;
664 }
665 if (!callback(std::move(key), std::move(marker))) {
666 return 0;
667 }
668 }
669 } while (truncated);
670
671 return 0;
672 }
673
674 /// coroutine wrapper for AsyncMetadataList
675 class MetadataListCR : public RGWSimpleCoroutine {
676 RGWAsyncRadosProcessor *const async_rados;
677 RGWMetadataManager *const mgr;
678 const std::string& section;
679 const std::string& start_marker;
680 MetadataListCallback callback;
681 RGWAsyncRadosRequest *req{nullptr};
682 public:
683 MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
684 RGWMetadataManager *mgr, const std::string& section,
685 const std::string& start_marker,
686 const MetadataListCallback& callback)
687 : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
688 section(section), start_marker(start_marker), callback(callback)
689 {}
690 ~MetadataListCR() override {
691 request_cleanup();
692 }
693
694 int send_request() override {
695 req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
696 mgr, section, start_marker, callback);
697 async_rados->queue(req);
698 return 0;
699 }
700 int request_complete() override {
701 return req->get_ret_status();
702 }
703 void request_cleanup() override {
704 if (req) {
705 req->finish();
706 req = nullptr;
707 }
708 }
709 };
710
711 class BucketTrimCR : public RGWCoroutine {
712 RGWRados *const store;
713 RGWHTTPManager *const http;
714 const BucketTrimConfig& config;
715 BucketTrimObserver *const observer;
716 const rgw_raw_obj& obj;
717 ceph::mono_time start_time;
718 bufferlist notify_replies;
719 BucketChangeCounter counter;
720 std::vector<std::string> buckets; //< buckets selected for trim
721 BucketTrimStatus status;
722 RGWObjVersionTracker objv; //< version tracker for trim status object
723 std::string last_cold_marker; //< position for next trim marker
724
725 static const std::string section; //< metadata section for bucket instances
726 public:
727 BucketTrimCR(RGWRados *store, RGWHTTPManager *http,
728 const BucketTrimConfig& config, BucketTrimObserver *observer,
729 const rgw_raw_obj& obj)
730 : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
731 observer(observer), obj(obj), counter(config.counter_size)
732 {}
733
734 int operate();
735 };
736
737 const std::string BucketTrimCR::section{"bucket.instance"};
738
739 int BucketTrimCR::operate()
740 {
741 reenter(this) {
742 start_time = ceph::mono_clock::now();
743
744 if (config.buckets_per_interval) {
745 // query watch/notify for hot buckets
746 ldout(cct, 10) << "fetching active bucket counters" << dendl;
747 set_status("fetching active bucket counters");
748 yield {
749 // request the top bucket counters from each peer gateway
750 const TrimNotifyType type = NotifyTrimCounters;
751 TrimCounters::Request request{32};
752 bufferlist bl;
753 ::encode(type, bl);
754 ::encode(request, bl);
755 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
756 &notify_replies));
757 }
758 if (retcode < 0) {
759 ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
760 return set_cr_error(retcode);
761 }
762
763 // select the hottest buckets for trim
764 retcode = accumulate_peer_counters(notify_replies, counter);
765 if (retcode < 0) {
766 ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
767 return set_cr_error(retcode);
768 }
769 buckets.reserve(config.buckets_per_interval);
770
771 const int max_count = config.buckets_per_interval -
772 config.min_cold_buckets_per_interval;
773 counter.get_highest(max_count,
774 [this] (const std::string& bucket, int count) {
775 buckets.push_back(bucket);
776 });
777 }
778
779 if (buckets.size() < config.buckets_per_interval) {
780 // read BucketTrimStatus for marker position
781 set_status("reading trim status");
782 using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
783 yield call(new ReadStatus(store->get_async_rados(), store, obj,
784 &status, true, &objv));
785 if (retcode < 0) {
786 ldout(cct, 10) << "failed to read bilog trim status: "
787 << cpp_strerror(retcode) << dendl;
788 return set_cr_error(retcode);
789 }
790 if (status.marker == "MAX") {
791 status.marker.clear(); // restart at the beginning
792 }
793 ldout(cct, 10) << "listing cold buckets from marker="
794 << status.marker << dendl;
795
796 set_status("listing cold buckets for trim");
797 yield {
798 // capture a reference so 'this' remains valid in the callback
799 auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
800 // list cold buckets to consider for trim
801 auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
802 // filter out keys that we trimmed recently
803 if (observer->trimmed_recently(bucket)) {
804 return true;
805 }
806 // filter out active buckets that we've already selected
807 auto i = std::find(buckets.begin(), buckets.end(), bucket);
808 if (i != buckets.end()) {
809 return true;
810 }
811 buckets.emplace_back(std::move(bucket));
812 // remember the last cold bucket spawned to update the status marker
813 last_cold_marker = std::move(marker);
814 // return true if there's room for more
815 return buckets.size() < config.buckets_per_interval;
816 };
817
818 call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
819 section, status.marker, cb));
820 }
821 if (retcode < 0) {
822 ldout(cct, 4) << "failed to list bucket instance metadata: "
823 << cpp_strerror(retcode) << dendl;
824 return set_cr_error(retcode);
825 }
826 }
827
828 // trim bucket instances with limited concurrency
829 set_status("trimming buckets");
830 ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
831 yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
832 config.concurrent_buckets));
833 // ignore errors from individual buckets
834
835 // write updated trim status
836 if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
837 set_status("writing updated trim status");
838 status.marker = std::move(last_cold_marker);
839 ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
840 using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
841 yield call(new WriteStatus(store->get_async_rados(), store, obj,
842 status, &objv));
843 if (retcode < 0) {
844 ldout(cct, 4) << "failed to write updated trim status: "
845 << cpp_strerror(retcode) << dendl;
846 return set_cr_error(retcode);
847 }
848 }
849
850 // notify peers that trim completed
851 set_status("trim completed");
852 yield {
853 const TrimNotifyType type = NotifyTrimComplete;
854 TrimComplete::Request request;
855 bufferlist bl;
856 ::encode(type, bl);
857 ::encode(request, bl);
858 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
859 nullptr));
860 }
861 if (retcode < 0) {
862 ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
863 return set_cr_error(retcode);
864 }
865
866 ldout(cct, 4) << "bucket index log processing completed in "
867 << ceph::mono_clock::now() - start_time << dendl;
868 return set_cr_done();
869 }
870 return 0;
871 }
872
873 class BucketTrimPollCR : public RGWCoroutine {
874 RGWRados *const store;
875 RGWHTTPManager *const http;
876 const BucketTrimConfig& config;
877 BucketTrimObserver *const observer;
878 const rgw_raw_obj& obj;
879 const std::string name{"trim"}; //< lock name
880 const std::string cookie;
881
882 public:
883 BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http,
884 const BucketTrimConfig& config,
885 BucketTrimObserver *observer, const rgw_raw_obj& obj)
886 : RGWCoroutine(store->ctx()), store(store), http(http),
887 config(config), observer(observer), obj(obj),
888 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
889 {}
890
891 int operate();
892 };
893
894 int BucketTrimPollCR::operate()
895 {
896 reenter(this) {
897 for (;;) {
898 set_status("sleeping");
899 wait(utime_t{config.trim_interval_sec, 0});
900
901 // prevent others from trimming for our entire wait interval
902 set_status("acquiring trim lock");
903 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
904 obj, name, cookie,
905 config.trim_interval_sec));
906 if (retcode < 0) {
907 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
908 continue;
909 }
910
911 set_status("trimming");
912 yield call(new BucketTrimCR(store, http, config, observer, obj));
913 if (retcode < 0) {
914 // on errors, unlock so other gateways can try
915 set_status("unlocking");
916 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
917 obj, name, cookie));
918 }
919 }
920 }
921 return 0;
922 }
923
924 /// tracks a bounded list of events with timestamps. old events can be expired,
925 /// and recent events can be searched by key. expiration depends on events being
926 /// inserted in temporal order
927 template <typename T, typename Clock = ceph::coarse_mono_clock>
928 class RecentEventList {
929 public:
930 using clock_type = Clock;
931 using time_point = typename clock_type::time_point;
932
933 RecentEventList(size_t max_size, const ceph::timespan& max_duration)
934 : events(max_size), max_duration(max_duration)
935 {}
936
937 /// insert an event at the given point in time. this time must be at least as
938 /// recent as the last inserted event
939 void insert(T&& value, const time_point& now) {
940 // assert(events.empty() || now >= events.back().time)
941 events.push_back(Event{std::move(value), now});
942 }
943
944 /// performs a linear search for an event matching the given key, whose type
945 /// U can be any that provides operator==(U, T)
946 template <typename U>
947 bool lookup(const U& key) const {
948 for (const auto& event : events) {
949 if (key == event.value) {
950 return true;
951 }
952 }
953 return false;
954 }
955
956 /// remove events that are no longer recent compared to the given point in time
957 void expire_old(const time_point& now) {
958 const auto expired_before = now - max_duration;
959 while (!events.empty() && events.front().time < expired_before) {
960 events.pop_front();
961 }
962 }
963
964 private:
965 struct Event {
966 T value;
967 time_point time;
968 };
969 boost::circular_buffer<Event> events;
970 const ceph::timespan max_duration;
971 };
972
973 namespace rgw {
974
975 // read bucket trim configuration from ceph context
976 void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
977 {
978 auto conf = cct->_conf;
979
980 config.trim_interval_sec =
981 conf->get_val<int64_t>("rgw_sync_log_trim_interval");
982 config.counter_size = 512;
983 config.buckets_per_interval =
984 conf->get_val<int64_t>("rgw_sync_log_trim_max_buckets");
985 config.min_cold_buckets_per_interval =
986 conf->get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
987 config.concurrent_buckets =
988 conf->get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
989 config.notify_timeout_ms = 10000;
990 config.recent_size = 128;
991 config.recent_duration = std::chrono::hours(2);
992 }
993
994 class BucketTrimManager::Impl : public TrimCounters::Server,
995 public BucketTrimObserver {
996 public:
997 RGWRados *const store;
998 const BucketTrimConfig config;
999
1000 const rgw_raw_obj status_obj;
1001
1002 /// count frequency of bucket instance entries in the data changes log
1003 BucketChangeCounter counter;
1004
1005 using RecentlyTrimmedBucketList = RecentEventList<std::string>;
1006 using clock_type = RecentlyTrimmedBucketList::clock_type;
1007 /// track recently trimmed buckets to focus trim activity elsewhere
1008 RecentlyTrimmedBucketList trimmed;
1009
1010 /// serve the bucket trim watch/notify api
1011 BucketTrimWatcher watcher;
1012
1013 /// protect data shared between data sync, trim, and watch/notify threads
1014 std::mutex mutex;
1015
1016 Impl(RGWRados *store, const BucketTrimConfig& config)
1017 : store(store), config(config),
1018 status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid),
1019 counter(config.counter_size),
1020 trimmed(config.recent_size, config.recent_duration),
1021 watcher(store, status_obj, this)
1022 {}
1023
1024 /// TrimCounters::Server interface for watch/notify api
1025 void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
1026 buckets.reserve(count);
1027 std::lock_guard<std::mutex> lock(mutex);
1028 counter.get_highest(count, [&buckets] (const std::string& key, int count) {
1029 buckets.emplace_back(key, count);
1030 });
1031 ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
1032 }
1033
1034 void reset_bucket_counters() override {
1035 ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
1036 std::lock_guard<std::mutex> lock(mutex);
1037 counter.clear();
1038 trimmed.expire_old(clock_type::now());
1039 }
1040
1041 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1042 void on_bucket_trimmed(std::string&& bucket_instance) override {
1043 ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
1044 std::lock_guard<std::mutex> lock(mutex);
1045 trimmed.insert(std::move(bucket_instance), clock_type::now());
1046 }
1047
1048 bool trimmed_recently(const boost::string_view& bucket_instance) override {
1049 std::lock_guard<std::mutex> lock(mutex);
1050 return trimmed.lookup(bucket_instance);
1051 }
1052 };
1053
1054 BucketTrimManager::BucketTrimManager(RGWRados *store,
1055 const BucketTrimConfig& config)
1056 : impl(new Impl(store, config))
1057 {
1058 }
1059 BucketTrimManager::~BucketTrimManager() = default;
1060
1061 int BucketTrimManager::init()
1062 {
1063 return impl->watcher.start();
1064 }
1065
1066 void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
1067 {
1068 std::lock_guard<std::mutex> lock(impl->mutex);
1069 // filter recently trimmed bucket instances out of bucket change counter
1070 if (impl->trimmed.lookup(bucket)) {
1071 return;
1072 }
1073 impl->counter.insert(bucket.to_string());
1074 }
1075
1076 RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
1077 {
1078 return new BucketTrimPollCR(impl->store, http, impl->config,
1079 impl.get(), impl->status_obj);
1080 }
1081
1082 RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
1083 {
1084 // return the trim coroutine without any polling
1085 return new BucketTrimCR(impl->store, http, impl->config,
1086 impl.get(), impl->status_obj);
1087 }
1088
1089 } // namespace rgw