]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_log_trim.cc
418441996f2b47fe6f878416a741cac7bae1f618
[ceph.git] / ceph / src / rgw / rgw_sync_log_trim.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * Author: Casey Bodley <cbodley@redhat.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 */
15
16 #include <mutex>
17 #include <boost/circular_buffer.hpp>
18 #include <boost/container/flat_map.hpp>
19
20 #include "common/bounded_key_counter.h"
21 #include "common/errno.h"
22 #include "rgw_sync_log_trim.h"
23 #include "rgw_cr_rados.h"
24 #include "rgw_cr_rest.h"
25 #include "rgw_data_sync.h"
26 #include "rgw_metadata.h"
27 #include "rgw_rados.h"
28 #include "rgw_sync.h"
29
30 #include <boost/asio/yield.hpp>
31 #include "include/assert.h"
32
33 #define dout_subsys ceph_subsys_rgw
34
35 #undef dout_prefix
36 #define dout_prefix (*_dout << "trim: ")
37
38 using rgw::BucketTrimConfig;
39 using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
40
41 const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
42 using rgw::BucketTrimStatus;
43
44
45 // watch/notify api for gateways to coordinate about which buckets to trim
46 enum TrimNotifyType {
47 NotifyTrimCounters = 0,
48 NotifyTrimComplete,
49 };
50 WRITE_RAW_ENCODER(TrimNotifyType);
51
52 struct TrimNotifyHandler {
53 virtual ~TrimNotifyHandler() = default;
54
55 virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
56 };
57
58 /// api to share the bucket trim counters between gateways in the same zone.
59 /// each gateway will process different datalog shards, so the gateway that runs
60 /// the trim process needs to accumulate their counters
61 struct TrimCounters {
62 /// counter for a single bucket
63 struct BucketCounter {
64 std::string bucket; //< bucket instance metadata key
65 int count{0};
66
67 BucketCounter() = default;
68 BucketCounter(const std::string& bucket, int count)
69 : bucket(bucket), count(count) {}
70
71 void encode(bufferlist& bl) const;
72 void decode(bufferlist::iterator& p);
73 };
74 using Vector = std::vector<BucketCounter>;
75
76 /// request bucket trim counters from peer gateways
77 struct Request {
78 uint16_t max_buckets; //< maximum number of bucket counters to return
79
80 void encode(bufferlist& bl) const;
81 void decode(bufferlist::iterator& p);
82 };
83
84 /// return the current bucket trim counters
85 struct Response {
86 Vector bucket_counters;
87
88 void encode(bufferlist& bl) const;
89 void decode(bufferlist::iterator& p);
90 };
91
92 /// server interface to query the hottest buckets
93 struct Server {
94 virtual ~Server() = default;
95
96 virtual void get_bucket_counters(int count, Vector& counters) = 0;
97 virtual void reset_bucket_counters() = 0;
98 };
99
100 /// notify handler
101 class Handler : public TrimNotifyHandler {
102 Server *const server;
103 public:
104 Handler(Server *server) : server(server) {}
105
106 void handle(bufferlist::iterator& input, bufferlist& output) override;
107 };
108 };
109 std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
110 {
111 return out << rhs.bucket << ":" << rhs.count;
112 }
113
114 void TrimCounters::BucketCounter::encode(bufferlist& bl) const
115 {
116 // no versioning to save space
117 ::encode(bucket, bl);
118 ::encode(count, bl);
119 }
120 void TrimCounters::BucketCounter::decode(bufferlist::iterator& p)
121 {
122 ::decode(bucket, p);
123 ::decode(count, p);
124 }
125 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
126
127 void TrimCounters::Request::encode(bufferlist& bl) const
128 {
129 ENCODE_START(1, 1, bl);
130 ::encode(max_buckets, bl);
131 ENCODE_FINISH(bl);
132 }
133 void TrimCounters::Request::decode(bufferlist::iterator& p)
134 {
135 DECODE_START(1, p);
136 ::decode(max_buckets, p);
137 DECODE_FINISH(p);
138 }
139 WRITE_CLASS_ENCODER(TrimCounters::Request);
140
141 void TrimCounters::Response::encode(bufferlist& bl) const
142 {
143 ENCODE_START(1, 1, bl);
144 ::encode(bucket_counters, bl);
145 ENCODE_FINISH(bl);
146 }
147 void TrimCounters::Response::decode(bufferlist::iterator& p)
148 {
149 DECODE_START(1, p);
150 ::decode(bucket_counters, p);
151 DECODE_FINISH(p);
152 }
153 WRITE_CLASS_ENCODER(TrimCounters::Response);
154
155 void TrimCounters::Handler::handle(bufferlist::iterator& input,
156 bufferlist& output)
157 {
158 Request request;
159 ::decode(request, input);
160 auto count = std::min<uint16_t>(request.max_buckets, 128);
161
162 Response response;
163 server->get_bucket_counters(count, response.bucket_counters);
164 ::encode(response, output);
165 }
166
167 /// api to notify peer gateways that trim has completed and their bucket change
168 /// counters can be reset
169 struct TrimComplete {
170 struct Request {
171 void encode(bufferlist& bl) const;
172 void decode(bufferlist::iterator& p);
173 };
174 struct Response {
175 void encode(bufferlist& bl) const;
176 void decode(bufferlist::iterator& p);
177 };
178
179 /// server interface to reset bucket counters
180 using Server = TrimCounters::Server;
181
182 /// notify handler
183 class Handler : public TrimNotifyHandler {
184 Server *const server;
185 public:
186 Handler(Server *server) : server(server) {}
187
188 void handle(bufferlist::iterator& input, bufferlist& output) override;
189 };
190 };
191
192 void TrimComplete::Request::encode(bufferlist& bl) const
193 {
194 ENCODE_START(1, 1, bl);
195 ENCODE_FINISH(bl);
196 }
197 void TrimComplete::Request::decode(bufferlist::iterator& p)
198 {
199 DECODE_START(1, p);
200 DECODE_FINISH(p);
201 }
202 WRITE_CLASS_ENCODER(TrimComplete::Request);
203
204 void TrimComplete::Response::encode(bufferlist& bl) const
205 {
206 ENCODE_START(1, 1, bl);
207 ENCODE_FINISH(bl);
208 }
209 void TrimComplete::Response::decode(bufferlist::iterator& p)
210 {
211 DECODE_START(1, p);
212 DECODE_FINISH(p);
213 }
214 WRITE_CLASS_ENCODER(TrimComplete::Response);
215
216 void TrimComplete::Handler::handle(bufferlist::iterator& input,
217 bufferlist& output)
218 {
219 Request request;
220 ::decode(request, input);
221
222 server->reset_bucket_counters();
223
224 Response response;
225 ::encode(response, output);
226 }
227
228
229 /// rados watcher for bucket trim notifications
230 class BucketTrimWatcher : public librados::WatchCtx2 {
231 RGWRados *const store;
232 const rgw_raw_obj& obj;
233 rgw_rados_ref ref;
234 uint64_t handle{0};
235
236 using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
237 boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
238
239 public:
240 BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
241 TrimCounters::Server *counters)
242 : store(store), obj(obj) {
243 handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
244 handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
245 }
246
247 ~BucketTrimWatcher() {
248 stop();
249 }
250
251 int start() {
252 int r = store->get_raw_obj_ref(obj, &ref);
253 if (r < 0) {
254 return r;
255 }
256
257 // register a watch on the realm's control object
258 r = ref.ioctx.watch2(ref.oid, &handle, this);
259 if (r == -ENOENT) {
260 constexpr bool exclusive = true;
261 r = ref.ioctx.create(ref.oid, exclusive);
262 if (r == -EEXIST || r == 0) {
263 r = ref.ioctx.watch2(ref.oid, &handle, this);
264 }
265 }
266 if (r < 0) {
267 lderr(store->ctx()) << "Failed to watch " << ref.oid
268 << " with " << cpp_strerror(-r) << dendl;
269 ref.ioctx.close();
270 return r;
271 }
272
273 ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl;
274 return 0;
275 }
276
277 int restart() {
278 int r = ref.ioctx.unwatch2(handle);
279 if (r < 0) {
280 lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
281 << " with " << cpp_strerror(-r) << dendl;
282 }
283 r = ref.ioctx.watch2(ref.oid, &handle, this);
284 if (r < 0) {
285 lderr(store->ctx()) << "Failed to restart watch on " << ref.oid
286 << " with " << cpp_strerror(-r) << dendl;
287 ref.ioctx.close();
288 }
289 return r;
290 }
291
292 void stop() {
293 if (handle) {
294 ref.ioctx.unwatch2(handle);
295 ref.ioctx.close();
296 }
297 }
298
299 /// respond to bucket trim notifications
300 void handle_notify(uint64_t notify_id, uint64_t cookie,
301 uint64_t notifier_id, bufferlist& bl) override {
302 if (cookie != handle) {
303 return;
304 }
305 bufferlist reply;
306 try {
307 auto p = bl.begin();
308 TrimNotifyType type;
309 ::decode(type, p);
310
311 auto handler = handlers.find(type);
312 if (handler != handlers.end()) {
313 handler->second->handle(p, reply);
314 } else {
315 lderr(store->ctx()) << "no handler for notify type " << type << dendl;
316 }
317 } catch (const buffer::error& e) {
318 lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
319 }
320 ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply);
321 }
322
323 /// reestablish the watch if it gets disconnected
324 void handle_error(uint64_t cookie, int err) override {
325 if (cookie != handle) {
326 return;
327 }
328 if (err == -ENOTCONN) {
329 ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl;
330 restart();
331 }
332 }
333 };
334
335
336 /// Interface to communicate with the trim manager about completed operations
337 struct BucketTrimObserver {
338 virtual ~BucketTrimObserver() = default;
339
340 virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
341 virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
342 };
343
344 /// populate the status with the minimum stable marker of each shard
345 template <typename Iter>
346 int take_min_status(CephContext *cct, Iter first, Iter last,
347 std::vector<std::string> *status)
348 {
349 status->clear();
350 boost::optional<size_t> num_shards;
351 for (auto peer = first; peer != last; ++peer) {
352 const size_t peer_shards = peer->size();
353 if (!num_shards) {
354 num_shards = peer_shards;
355 status->resize(*num_shards);
356 } else if (*num_shards != peer_shards) {
357 // all peers must agree on the number of shards
358 return -EINVAL;
359 }
360 auto m = status->begin();
361 for (auto& shard : *peer) {
362 auto& marker = *m++;
363 // only consider incremental sync markers
364 if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
365 continue;
366 }
367 // always take the first marker, or any later marker that's smaller
368 if (peer == first || marker > shard.inc_marker.position) {
369 marker = std::move(shard.inc_marker.position);
370 }
371 }
372 }
373 return 0;
374 }
375
376 /// trim each bilog shard to the given marker, while limiting the number of
377 /// concurrent requests
378 class BucketTrimShardCollectCR : public RGWShardCollectCR {
379 static constexpr int MAX_CONCURRENT_SHARDS = 16;
380 RGWRados *const store;
381 const RGWBucketInfo& bucket_info;
382 const std::vector<std::string>& markers; //< shard markers to trim
383 size_t i{0}; //< index of current shard marker
384 public:
385 BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
386 const std::vector<std::string>& markers)
387 : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
388 store(store), bucket_info(bucket_info), markers(markers)
389 {}
390 bool spawn_next() override;
391 };
392
393 bool BucketTrimShardCollectCR::spawn_next()
394 {
395 while (i < markers.size()) {
396 const auto& marker = markers[i];
397 const auto shard_id = i++;
398
399 // skip empty markers
400 if (!marker.empty()) {
401 ldout(cct, 10) << "trimming bilog shard " << shard_id
402 << " of " << bucket_info.bucket << " at marker " << marker << dendl;
403 spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
404 std::string{}, marker),
405 false);
406 return true;
407 }
408 }
409 return false;
410 }
411
412 /// trim the bilog of all of the given bucket instance's shards
413 class BucketTrimInstanceCR : public RGWCoroutine {
414 RGWRados *const store;
415 RGWHTTPManager *const http;
416 BucketTrimObserver *const observer;
417 std::string bucket_instance;
418 const std::string& zone_id; //< my zone id
419 RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
420
421 using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
422 std::vector<StatusShards> peer_status; //< sync status for each peer
423 std::vector<std::string> min_markers; //< min marker per shard
424
425 public:
426 BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
427 BucketTrimObserver *observer,
428 const std::string& bucket_instance)
429 : RGWCoroutine(store->ctx()), store(store),
430 http(http), observer(observer),
431 bucket_instance(bucket_instance),
432 zone_id(store->get_zone().id),
433 peer_status(store->zone_conn_map.size())
434 {}
435
436 int operate() override;
437 };
438
439 int BucketTrimInstanceCR::operate()
440 {
441 reenter(this) {
442 ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
443
444 // query peers for sync status
445 set_status("fetching sync status from peers");
446 yield {
447 // query data sync status from each sync peer
448 rgw_http_param_pair params[] = {
449 { "type", "bucket-index" },
450 { "status", nullptr },
451 { "bucket", bucket_instance.c_str() },
452 { "source-zone", zone_id.c_str() },
453 { nullptr, nullptr }
454 };
455
456 auto p = peer_status.begin();
457 for (auto& c : store->zone_conn_map) {
458 using StatusCR = RGWReadRESTResourceCR<StatusShards>;
459 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
460 false);
461 ++p;
462 }
463 // in parallel, read the local bucket instance info
464 spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
465 bucket_instance, &bucket_info),
466 false);
467 }
468 // wait for a response from each peer. all must respond to attempt trim
469 while (num_spawned()) {
470 int child_ret;
471 yield wait_for_child();
472 collect(&child_ret, nullptr);
473 if (child_ret < 0) {
474 drain_all();
475 return set_cr_error(child_ret);
476 }
477 }
478
479 // determine the minimum marker for each shard
480 retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
481 &min_markers);
482 if (retcode < 0) {
483 ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
484 return set_cr_error(retcode);
485 }
486
487 // trim shards with a ShardCollectCR
488 ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
489 << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
490 set_status("trimming bilog shards");
491 yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
492 // ENODATA just means there were no keys to trim
493 if (retcode == -ENODATA) {
494 retcode = 0;
495 }
496 if (retcode < 0) {
497 ldout(cct, 4) << "failed to trim bilog shards: "
498 << cpp_strerror(retcode) << dendl;
499 return set_cr_error(retcode);
500 }
501
502 observer->on_bucket_trimmed(std::move(bucket_instance));
503 return set_cr_done();
504 }
505 return 0;
506 }
507
508 /// trim each bucket instance while limiting the number of concurrent operations
509 class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
510 RGWRados *const store;
511 RGWHTTPManager *const http;
512 BucketTrimObserver *const observer;
513 std::vector<std::string>::const_iterator bucket;
514 std::vector<std::string>::const_iterator end;
515 public:
516 BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http,
517 BucketTrimObserver *observer,
518 const std::vector<std::string>& buckets,
519 int max_concurrent)
520 : RGWShardCollectCR(store->ctx(), max_concurrent),
521 store(store), http(http), observer(observer),
522 bucket(buckets.begin()), end(buckets.end())
523 {}
524 bool spawn_next() override;
525 };
526
527 bool BucketTrimInstanceCollectCR::spawn_next()
528 {
529 if (bucket == end) {
530 return false;
531 }
532 spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
533 ++bucket;
534 return true;
535 }
536
537 /// correlate the replies from each peer gateway into the given counter
538 int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
539 {
540 counter.clear();
541
542 try {
543 // decode notify responses
544 auto p = bl.begin();
545 std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
546 std::set<std::pair<uint64_t, uint64_t>> timeouts;
547 ::decode(replies, p);
548 ::decode(timeouts, p);
549
550 for (auto& peer : replies) {
551 auto q = peer.second.begin();
552 TrimCounters::Response response;
553 ::decode(response, q);
554 for (const auto& b : response.bucket_counters) {
555 counter.insert(b.bucket, b.count);
556 }
557 }
558 } catch (const buffer::error& e) {
559 return -EIO;
560 }
561 return 0;
562 }
563
564 /// metadata callback has the signature bool(string&& key, string&& marker)
565 using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
566
567 /// lists metadata keys, passing each to a callback until it returns false.
568 /// on reaching the end, it will restart at the beginning and list up to the
569 /// initial marker
570 class AsyncMetadataList : public RGWAsyncRadosRequest {
571 CephContext *const cct;
572 RGWMetadataManager *const mgr;
573 const std::string section;
574 const std::string start_marker;
575 MetadataListCallback callback;
576 void *handle{nullptr};
577
578 int _send_request() override;
579 public:
580 AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
581 RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
582 const std::string& section, const std::string& start_marker,
583 const MetadataListCallback& callback)
584 : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
585 section(section), start_marker(start_marker), callback(callback)
586 {}
587 ~AsyncMetadataList() override {
588 if (handle) {
589 mgr->list_keys_complete(handle);
590 }
591 }
592 };
593
594 int AsyncMetadataList::_send_request()
595 {
596 // start a listing at the given marker
597 int r = mgr->list_keys_init(section, start_marker, &handle);
598 if (r < 0) {
599 ldout(cct, 10) << "failed to init metadata listing: "
600 << cpp_strerror(r) << dendl;
601 return r;
602 }
603 ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
604
605 std::list<std::string> keys;
606 bool truncated{false};
607 std::string marker;
608
609 do {
610 // get the next key and marker
611 r = mgr->list_keys_next(handle, 1, keys, &truncated);
612 if (r < 0) {
613 ldout(cct, 10) << "failed to list metadata: "
614 << cpp_strerror(r) << dendl;
615 return r;
616 }
617 marker = mgr->get_marker(handle);
618
619 if (!keys.empty()) {
620 assert(keys.size() == 1);
621 auto& key = keys.front();
622 if (!callback(std::move(key), std::move(marker))) {
623 return 0;
624 }
625 }
626 } while (truncated);
627
628 if (start_marker.empty()) {
629 // already listed all keys
630 return 0;
631 }
632
633 // restart the listing from the beginning (empty marker)
634 mgr->list_keys_complete(handle);
635 handle = nullptr;
636
637 r = mgr->list_keys_init(section, "", &handle);
638 if (r < 0) {
639 ldout(cct, 10) << "failed to restart metadata listing: "
640 << cpp_strerror(r) << dendl;
641 return r;
642 }
643 ldout(cct, 20) << "restarting metadata listing" << dendl;
644
645 do {
646 // get the next key and marker
647 r = mgr->list_keys_next(handle, 1, keys, &truncated);
648 if (r < 0) {
649 ldout(cct, 10) << "failed to list metadata: "
650 << cpp_strerror(r) << dendl;
651 return r;
652 }
653 marker = mgr->get_marker(handle);
654
655 if (!keys.empty()) {
656 assert(keys.size() == 1);
657 auto& key = keys.front();
658 // stop at original marker
659 if (marker >= start_marker) {
660 return 0;
661 }
662 if (!callback(std::move(key), std::move(marker))) {
663 return 0;
664 }
665 }
666 } while (truncated);
667
668 return 0;
669 }
670
671 /// coroutine wrapper for AsyncMetadataList
672 class MetadataListCR : public RGWSimpleCoroutine {
673 RGWAsyncRadosProcessor *const async_rados;
674 RGWMetadataManager *const mgr;
675 const std::string& section;
676 const std::string& start_marker;
677 MetadataListCallback callback;
678 RGWAsyncRadosRequest *req{nullptr};
679 public:
680 MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
681 RGWMetadataManager *mgr, const std::string& section,
682 const std::string& start_marker,
683 const MetadataListCallback& callback)
684 : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
685 section(section), start_marker(start_marker), callback(callback)
686 {}
687 ~MetadataListCR() override {
688 request_cleanup();
689 }
690
691 int send_request() override {
692 req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
693 mgr, section, start_marker, callback);
694 async_rados->queue(req);
695 return 0;
696 }
697 int request_complete() override {
698 return req->get_ret_status();
699 }
700 void request_cleanup() override {
701 if (req) {
702 req->finish();
703 req = nullptr;
704 }
705 }
706 };
707
708 class BucketTrimCR : public RGWCoroutine {
709 RGWRados *const store;
710 RGWHTTPManager *const http;
711 const BucketTrimConfig& config;
712 BucketTrimObserver *const observer;
713 const rgw_raw_obj& obj;
714 ceph::mono_time start_time;
715 bufferlist notify_replies;
716 BucketChangeCounter counter;
717 std::vector<std::string> buckets; //< buckets selected for trim
718 BucketTrimStatus status;
719 RGWObjVersionTracker objv; //< version tracker for trim status object
720 std::string last_cold_marker; //< position for next trim marker
721
722 static const std::string section; //< metadata section for bucket instances
723 public:
724 BucketTrimCR(RGWRados *store, RGWHTTPManager *http,
725 const BucketTrimConfig& config, BucketTrimObserver *observer,
726 const rgw_raw_obj& obj)
727 : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
728 observer(observer), obj(obj), counter(config.counter_size)
729 {}
730
731 int operate();
732 };
733
734 const std::string BucketTrimCR::section{"bucket.instance"};
735
736 int BucketTrimCR::operate()
737 {
738 reenter(this) {
739 start_time = ceph::mono_clock::now();
740
741 if (config.buckets_per_interval) {
742 // query watch/notify for hot buckets
743 ldout(cct, 10) << "fetching active bucket counters" << dendl;
744 set_status("fetching active bucket counters");
745 yield {
746 // request the top bucket counters from each peer gateway
747 const TrimNotifyType type = NotifyTrimCounters;
748 TrimCounters::Request request{32};
749 bufferlist bl;
750 ::encode(type, bl);
751 ::encode(request, bl);
752 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
753 &notify_replies));
754 }
755 if (retcode < 0) {
756 ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
757 return set_cr_error(retcode);
758 }
759
760 // select the hottest buckets for trim
761 retcode = accumulate_peer_counters(notify_replies, counter);
762 if (retcode < 0) {
763 ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
764 return set_cr_error(retcode);
765 }
766 buckets.reserve(config.buckets_per_interval);
767
768 const int max_count = config.buckets_per_interval -
769 config.min_cold_buckets_per_interval;
770 counter.get_highest(max_count,
771 [this] (const std::string& bucket, int count) {
772 buckets.push_back(bucket);
773 });
774 }
775
776 if (buckets.size() < config.buckets_per_interval) {
777 // read BucketTrimStatus for marker position
778 set_status("reading trim status");
779 using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
780 yield call(new ReadStatus(store->get_async_rados(), store, obj,
781 &status, true, &objv));
782 if (retcode < 0) {
783 ldout(cct, 10) << "failed to read bilog trim status: "
784 << cpp_strerror(retcode) << dendl;
785 return set_cr_error(retcode);
786 }
787 if (status.marker == "MAX") {
788 status.marker.clear(); // restart at the beginning
789 }
790 ldout(cct, 10) << "listing cold buckets from marker="
791 << status.marker << dendl;
792
793 set_status("listing cold buckets for trim");
794 yield {
795 // capture a reference so 'this' remains valid in the callback
796 auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
797 // list cold buckets to consider for trim
798 auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
799 // filter out keys that we trimmed recently
800 if (observer->trimmed_recently(bucket)) {
801 return true;
802 }
803 // filter out active buckets that we've already selected
804 auto i = std::find(buckets.begin(), buckets.end(), bucket);
805 if (i != buckets.end()) {
806 return true;
807 }
808 buckets.emplace_back(std::move(bucket));
809 // remember the last cold bucket spawned to update the status marker
810 last_cold_marker = std::move(marker);
811 // return true if there's room for more
812 return buckets.size() < config.buckets_per_interval;
813 };
814
815 call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
816 section, status.marker, cb));
817 }
818 if (retcode < 0) {
819 ldout(cct, 4) << "failed to list bucket instance metadata: "
820 << cpp_strerror(retcode) << dendl;
821 return set_cr_error(retcode);
822 }
823 }
824
825 // trim bucket instances with limited concurrency
826 set_status("trimming buckets");
827 ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
828 yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
829 config.concurrent_buckets));
830 // ignore errors from individual buckets
831
832 // write updated trim status
833 if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
834 set_status("writing updated trim status");
835 status.marker = std::move(last_cold_marker);
836 ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
837 using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
838 yield call(new WriteStatus(store->get_async_rados(), store, obj,
839 status, &objv));
840 if (retcode < 0) {
841 ldout(cct, 4) << "failed to write updated trim status: "
842 << cpp_strerror(retcode) << dendl;
843 return set_cr_error(retcode);
844 }
845 }
846
847 // notify peers that trim completed
848 set_status("trim completed");
849 yield {
850 const TrimNotifyType type = NotifyTrimComplete;
851 TrimComplete::Request request;
852 bufferlist bl;
853 ::encode(type, bl);
854 ::encode(request, bl);
855 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
856 nullptr));
857 }
858 if (retcode < 0) {
859 ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
860 return set_cr_error(retcode);
861 }
862
863 ldout(cct, 4) << "bucket index log processing completed in "
864 << ceph::mono_clock::now() - start_time << dendl;
865 return set_cr_done();
866 }
867 return 0;
868 }
869
870 class BucketTrimPollCR : public RGWCoroutine {
871 RGWRados *const store;
872 RGWHTTPManager *const http;
873 const BucketTrimConfig& config;
874 BucketTrimObserver *const observer;
875 const rgw_raw_obj& obj;
876 const std::string name{"trim"}; //< lock name
877 const std::string cookie;
878
879 public:
880 BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http,
881 const BucketTrimConfig& config,
882 BucketTrimObserver *observer, const rgw_raw_obj& obj)
883 : RGWCoroutine(store->ctx()), store(store), http(http),
884 config(config), observer(observer), obj(obj),
885 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
886 {}
887
888 int operate();
889 };
890
891 int BucketTrimPollCR::operate()
892 {
893 reenter(this) {
894 for (;;) {
895 set_status("sleeping");
896 wait(utime_t{config.trim_interval_sec, 0});
897
898 // prevent others from trimming for our entire wait interval
899 set_status("acquiring trim lock");
900 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
901 obj, name, cookie,
902 config.trim_interval_sec));
903 if (retcode < 0) {
904 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
905 continue;
906 }
907
908 set_status("trimming");
909 yield call(new BucketTrimCR(store, http, config, observer, obj));
910 if (retcode < 0) {
911 // on errors, unlock so other gateways can try
912 set_status("unlocking");
913 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
914 obj, name, cookie));
915 }
916 }
917 }
918 return 0;
919 }
920
921 /// tracks a bounded list of events with timestamps. old events can be expired,
922 /// and recent events can be searched by key. expiration depends on events being
923 /// inserted in temporal order
924 template <typename T, typename Clock = ceph::coarse_mono_clock>
925 class RecentEventList {
926 public:
927 using clock_type = Clock;
928 using time_point = typename clock_type::time_point;
929
930 RecentEventList(size_t max_size, const ceph::timespan& max_duration)
931 : events(max_size), max_duration(max_duration)
932 {}
933
934 /// insert an event at the given point in time. this time must be at least as
935 /// recent as the last inserted event
936 void insert(T&& value, const time_point& now) {
937 // assert(events.empty() || now >= events.back().time)
938 events.push_back(Event{std::move(value), now});
939 }
940
941 /// performs a linear search for an event matching the given key, whose type
942 /// U can be any that provides operator==(U, T)
943 template <typename U>
944 bool lookup(const U& key) const {
945 for (const auto& event : events) {
946 if (key == event.value) {
947 return true;
948 }
949 }
950 return false;
951 }
952
953 /// remove events that are no longer recent compared to the given point in time
954 void expire_old(const time_point& now) {
955 const auto expired_before = now - max_duration;
956 while (!events.empty() && events.front().time < expired_before) {
957 events.pop_front();
958 }
959 }
960
961 private:
962 struct Event {
963 T value;
964 time_point time;
965 };
966 boost::circular_buffer<Event> events;
967 const ceph::timespan max_duration;
968 };
969
970 namespace rgw {
971
972 // read bucket trim configuration from ceph context
973 void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
974 {
975 auto conf = cct->_conf;
976
977 config.trim_interval_sec =
978 conf->get_val<int64_t>("rgw_sync_log_trim_interval");
979 config.counter_size = 512;
980 config.buckets_per_interval =
981 conf->get_val<int64_t>("rgw_sync_log_trim_max_buckets");
982 config.min_cold_buckets_per_interval =
983 conf->get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
984 config.concurrent_buckets =
985 conf->get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
986 config.notify_timeout_ms = 10000;
987 config.recent_size = 128;
988 config.recent_duration = std::chrono::hours(2);
989 }
990
991 class BucketTrimManager::Impl : public TrimCounters::Server,
992 public BucketTrimObserver {
993 public:
994 RGWRados *const store;
995 const BucketTrimConfig config;
996
997 const rgw_raw_obj status_obj;
998
999 /// count frequency of bucket instance entries in the data changes log
1000 BucketChangeCounter counter;
1001
1002 using RecentlyTrimmedBucketList = RecentEventList<std::string>;
1003 using clock_type = RecentlyTrimmedBucketList::clock_type;
1004 /// track recently trimmed buckets to focus trim activity elsewhere
1005 RecentlyTrimmedBucketList trimmed;
1006
1007 /// serve the bucket trim watch/notify api
1008 BucketTrimWatcher watcher;
1009
1010 /// protect data shared between data sync, trim, and watch/notify threads
1011 std::mutex mutex;
1012
1013 Impl(RGWRados *store, const BucketTrimConfig& config)
1014 : store(store), config(config),
1015 status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid),
1016 counter(config.counter_size),
1017 trimmed(config.recent_size, config.recent_duration),
1018 watcher(store, status_obj, this)
1019 {}
1020
1021 /// TrimCounters::Server interface for watch/notify api
1022 void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
1023 buckets.reserve(count);
1024 std::lock_guard<std::mutex> lock(mutex);
1025 counter.get_highest(count, [&buckets] (const std::string& key, int count) {
1026 buckets.emplace_back(key, count);
1027 });
1028 ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
1029 }
1030
1031 void reset_bucket_counters() override {
1032 ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
1033 std::lock_guard<std::mutex> lock(mutex);
1034 counter.clear();
1035 trimmed.expire_old(clock_type::now());
1036 }
1037
1038 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1039 void on_bucket_trimmed(std::string&& bucket_instance) override {
1040 ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
1041 std::lock_guard<std::mutex> lock(mutex);
1042 trimmed.insert(std::move(bucket_instance), clock_type::now());
1043 }
1044
1045 bool trimmed_recently(const boost::string_view& bucket_instance) override {
1046 std::lock_guard<std::mutex> lock(mutex);
1047 return trimmed.lookup(bucket_instance);
1048 }
1049 };
1050
1051 BucketTrimManager::BucketTrimManager(RGWRados *store,
1052 const BucketTrimConfig& config)
1053 : impl(new Impl(store, config))
1054 {
1055 }
1056 BucketTrimManager::~BucketTrimManager() = default;
1057
1058 int BucketTrimManager::init()
1059 {
1060 return impl->watcher.start();
1061 }
1062
1063 void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
1064 {
1065 std::lock_guard<std::mutex> lock(impl->mutex);
1066 // filter recently trimmed bucket instances out of bucket change counter
1067 if (impl->trimmed.lookup(bucket)) {
1068 return;
1069 }
1070 impl->counter.insert(bucket.to_string());
1071 }
1072
1073 RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
1074 {
1075 return new BucketTrimPollCR(impl->store, http, impl->config,
1076 impl.get(), impl->status_obj);
1077 }
1078
1079 RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
1080 {
1081 // return the trim coroutine without any polling
1082 return new BucketTrimCR(impl->store, http, impl->config,
1083 impl.get(), impl->status_obj);
1084 }
1085
1086 } // namespace rgw