]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_log_trim.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_sync_log_trim.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2017 Red Hat, Inc
8 *
9 * Author: Casey Bodley <cbodley@redhat.com>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 */
16
17 #include <mutex>
18 #include <boost/circular_buffer.hpp>
19 #include <boost/container/flat_map.hpp>
20
21 #include "include/scope_guard.h"
22 #include "common/bounded_key_counter.h"
23 #include "common/errno.h"
24 #include "rgw_sync_log_trim.h"
25 #include "rgw_cr_rados.h"
26 #include "rgw_cr_rest.h"
27 #include "rgw_data_sync.h"
28 #include "rgw_metadata.h"
29 #include "rgw_rados.h"
30 #include "rgw_zone.h"
31 #include "rgw_sync.h"
32
33 #include "services/svc_zone.h"
34
35 #include <boost/asio/yield.hpp>
36 #include "include/ceph_assert.h"
37
38 #define dout_subsys ceph_subsys_rgw
39
40 #undef dout_prefix
41 #define dout_prefix (*_dout << "trim: ")
42
43 using rgw::BucketTrimConfig;
44 using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
45
46 const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
47 using rgw::BucketTrimStatus;
48
49
50 // watch/notify api for gateways to coordinate about which buckets to trim
51 enum TrimNotifyType {
52 NotifyTrimCounters = 0,
53 NotifyTrimComplete,
54 };
55 WRITE_RAW_ENCODER(TrimNotifyType);
56
57 struct TrimNotifyHandler {
58 virtual ~TrimNotifyHandler() = default;
59
60 virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
61 };
62
63 /// api to share the bucket trim counters between gateways in the same zone.
64 /// each gateway will process different datalog shards, so the gateway that runs
65 /// the trim process needs to accumulate their counters
66 struct TrimCounters {
67 /// counter for a single bucket
68 struct BucketCounter {
69 std::string bucket; //< bucket instance metadata key
70 int count{0};
71
72 BucketCounter() = default;
73 BucketCounter(const std::string& bucket, int count)
74 : bucket(bucket), count(count) {}
75
76 void encode(bufferlist& bl) const;
77 void decode(bufferlist::const_iterator& p);
78 };
79 using Vector = std::vector<BucketCounter>;
80
81 /// request bucket trim counters from peer gateways
82 struct Request {
83 uint16_t max_buckets; //< maximum number of bucket counters to return
84
85 void encode(bufferlist& bl) const;
86 void decode(bufferlist::const_iterator& p);
87 };
88
89 /// return the current bucket trim counters
90 struct Response {
91 Vector bucket_counters;
92
93 void encode(bufferlist& bl) const;
94 void decode(bufferlist::const_iterator& p);
95 };
96
97 /// server interface to query the hottest buckets
98 struct Server {
99 virtual ~Server() = default;
100
101 virtual void get_bucket_counters(int count, Vector& counters) = 0;
102 virtual void reset_bucket_counters() = 0;
103 };
104
105 /// notify handler
106 class Handler : public TrimNotifyHandler {
107 Server *const server;
108 public:
109 explicit Handler(Server *server) : server(server) {}
110
111 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
112 };
113 };
114 std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
115 {
116 return out << rhs.bucket << ":" << rhs.count;
117 }
118
119 void TrimCounters::BucketCounter::encode(bufferlist& bl) const
120 {
121 using ceph::encode;
122 // no versioning to save space
123 encode(bucket, bl);
124 encode(count, bl);
125 }
126 void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
127 {
128 using ceph::decode;
129 decode(bucket, p);
130 decode(count, p);
131 }
132 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
133
134 void TrimCounters::Request::encode(bufferlist& bl) const
135 {
136 ENCODE_START(1, 1, bl);
137 encode(max_buckets, bl);
138 ENCODE_FINISH(bl);
139 }
140 void TrimCounters::Request::decode(bufferlist::const_iterator& p)
141 {
142 DECODE_START(1, p);
143 decode(max_buckets, p);
144 DECODE_FINISH(p);
145 }
146 WRITE_CLASS_ENCODER(TrimCounters::Request);
147
148 void TrimCounters::Response::encode(bufferlist& bl) const
149 {
150 ENCODE_START(1, 1, bl);
151 encode(bucket_counters, bl);
152 ENCODE_FINISH(bl);
153 }
154 void TrimCounters::Response::decode(bufferlist::const_iterator& p)
155 {
156 DECODE_START(1, p);
157 decode(bucket_counters, p);
158 DECODE_FINISH(p);
159 }
160 WRITE_CLASS_ENCODER(TrimCounters::Response);
161
162 void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
163 bufferlist& output)
164 {
165 Request request;
166 decode(request, input);
167 auto count = std::min<uint16_t>(request.max_buckets, 128);
168
169 Response response;
170 server->get_bucket_counters(count, response.bucket_counters);
171 encode(response, output);
172 }
173
174 /// api to notify peer gateways that trim has completed and their bucket change
175 /// counters can be reset
176 struct TrimComplete {
177 struct Request {
178 void encode(bufferlist& bl) const;
179 void decode(bufferlist::const_iterator& p);
180 };
181 struct Response {
182 void encode(bufferlist& bl) const;
183 void decode(bufferlist::const_iterator& p);
184 };
185
186 /// server interface to reset bucket counters
187 using Server = TrimCounters::Server;
188
189 /// notify handler
190 class Handler : public TrimNotifyHandler {
191 Server *const server;
192 public:
193 explicit Handler(Server *server) : server(server) {}
194
195 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
196 };
197 };
198
199 void TrimComplete::Request::encode(bufferlist& bl) const
200 {
201 ENCODE_START(1, 1, bl);
202 ENCODE_FINISH(bl);
203 }
204 void TrimComplete::Request::decode(bufferlist::const_iterator& p)
205 {
206 DECODE_START(1, p);
207 DECODE_FINISH(p);
208 }
209 WRITE_CLASS_ENCODER(TrimComplete::Request);
210
211 void TrimComplete::Response::encode(bufferlist& bl) const
212 {
213 ENCODE_START(1, 1, bl);
214 ENCODE_FINISH(bl);
215 }
216 void TrimComplete::Response::decode(bufferlist::const_iterator& p)
217 {
218 DECODE_START(1, p);
219 DECODE_FINISH(p);
220 }
221 WRITE_CLASS_ENCODER(TrimComplete::Response);
222
223 void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
224 bufferlist& output)
225 {
226 Request request;
227 decode(request, input);
228
229 server->reset_bucket_counters();
230
231 Response response;
232 encode(response, output);
233 }
234
235
236 /// rados watcher for bucket trim notifications
237 class BucketTrimWatcher : public librados::WatchCtx2 {
238 RGWRados *const store;
239 const rgw_raw_obj& obj;
240 rgw_rados_ref ref;
241 uint64_t handle{0};
242
243 using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
244 boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
245
246 public:
247 BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
248 TrimCounters::Server *counters)
249 : store(store), obj(obj) {
250 handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
251 handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
252 }
253
254 ~BucketTrimWatcher() {
255 stop();
256 }
257
258 int start() {
259 int r = store->get_raw_obj_ref(obj, &ref);
260 if (r < 0) {
261 return r;
262 }
263
264 // register a watch on the realm's control object
265 r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
266 if (r == -ENOENT) {
267 constexpr bool exclusive = true;
268 r = ref.ioctx.create(ref.obj.oid, exclusive);
269 if (r == -EEXIST || r == 0) {
270 r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
271 }
272 }
273 if (r < 0) {
274 lderr(store->ctx()) << "Failed to watch " << ref.obj
275 << " with " << cpp_strerror(-r) << dendl;
276 ref.ioctx.close();
277 return r;
278 }
279
280 ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
281 return 0;
282 }
283
284 int restart() {
285 int r = ref.ioctx.unwatch2(handle);
286 if (r < 0) {
287 lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
288 << " with " << cpp_strerror(-r) << dendl;
289 }
290 r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
291 if (r < 0) {
292 lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
293 << " with " << cpp_strerror(-r) << dendl;
294 ref.ioctx.close();
295 }
296 return r;
297 }
298
299 void stop() {
300 if (handle) {
301 ref.ioctx.unwatch2(handle);
302 ref.ioctx.close();
303 }
304 }
305
306 /// respond to bucket trim notifications
307 void handle_notify(uint64_t notify_id, uint64_t cookie,
308 uint64_t notifier_id, bufferlist& bl) override {
309 if (cookie != handle) {
310 return;
311 }
312 bufferlist reply;
313 try {
314 auto p = bl.cbegin();
315 TrimNotifyType type;
316 decode(type, p);
317
318 auto handler = handlers.find(type);
319 if (handler != handlers.end()) {
320 handler->second->handle(p, reply);
321 } else {
322 lderr(store->ctx()) << "no handler for notify type " << type << dendl;
323 }
324 } catch (const buffer::error& e) {
325 lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
326 }
327 ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply);
328 }
329
330 /// reestablish the watch if it gets disconnected
331 void handle_error(uint64_t cookie, int err) override {
332 if (cookie != handle) {
333 return;
334 }
335 if (err == -ENOTCONN) {
336 ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
337 restart();
338 }
339 }
340 };
341
342
343 /// Interface to communicate with the trim manager about completed operations
344 struct BucketTrimObserver {
345 virtual ~BucketTrimObserver() = default;
346
347 virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
348 virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
349 };
350
351 /// populate the status with the minimum stable marker of each shard
352 template <typename Iter>
353 int take_min_status(CephContext *cct, Iter first, Iter last,
354 std::vector<std::string> *status)
355 {
356 status->clear();
357 // The initialisation below is required to silence a false positive
358 // -Wmaybe-uninitialized warning
359 boost::optional<uint64_t> num_shards = boost::make_optional(false, uint64_t());
360 for (auto peer = first; peer != last; ++peer) {
361 const size_t peer_shards = peer->size();
362 if (!num_shards) {
363 num_shards = peer_shards;
364 status->resize(*num_shards);
365 } else if (*num_shards != peer_shards) {
366 // all peers must agree on the number of shards
367 return -EINVAL;
368 }
369 auto m = status->begin();
370 for (auto& shard : *peer) {
371 auto& marker = *m++;
372 // only consider incremental sync markers
373 if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
374 continue;
375 }
376 // always take the first marker, or any later marker that's smaller
377 if (peer == first || marker > shard.inc_marker.position) {
378 marker = std::move(shard.inc_marker.position);
379 }
380 }
381 }
382 return 0;
383 }
384
385 /// trim each bilog shard to the given marker, while limiting the number of
386 /// concurrent requests
387 class BucketTrimShardCollectCR : public RGWShardCollectCR {
388 static constexpr int MAX_CONCURRENT_SHARDS = 16;
389 RGWRados *const store;
390 const RGWBucketInfo& bucket_info;
391 const std::vector<std::string>& markers; //< shard markers to trim
392 size_t i{0}; //< index of current shard marker
393 public:
394 BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
395 const std::vector<std::string>& markers)
396 : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
397 store(store), bucket_info(bucket_info), markers(markers)
398 {}
399 bool spawn_next() override;
400 };
401
402 bool BucketTrimShardCollectCR::spawn_next()
403 {
404 while (i < markers.size()) {
405 const auto& marker = markers[i];
406 const auto shard_id = i++;
407
408 // skip empty markers
409 if (!marker.empty()) {
410 ldout(cct, 10) << "trimming bilog shard " << shard_id
411 << " of " << bucket_info.bucket << " at marker " << marker << dendl;
412 spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
413 std::string{}, marker),
414 false);
415 return true;
416 }
417 }
418 return false;
419 }
420
421 /// trim the bilog of all of the given bucket instance's shards
422 class BucketTrimInstanceCR : public RGWCoroutine {
423 RGWRados *const store;
424 RGWHTTPManager *const http;
425 BucketTrimObserver *const observer;
426 std::string bucket_instance;
427 const std::string& zone_id; //< my zone id
428 RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
429 int child_ret = 0;
430
431 using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
432 std::vector<StatusShards> peer_status; //< sync status for each peer
433 std::vector<std::string> min_markers; //< min marker per shard
434
435 public:
436 BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
437 BucketTrimObserver *observer,
438 const std::string& bucket_instance)
439 : RGWCoroutine(store->ctx()), store(store),
440 http(http), observer(observer),
441 bucket_instance(bucket_instance),
442 zone_id(store->svc.zone->get_zone().id),
443 peer_status(store->svc.zone->get_zone_conn_map().size())
444 {}
445
446 int operate() override;
447 };
448
449 int BucketTrimInstanceCR::operate()
450 {
451 reenter(this) {
452 ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
453
454 // query peers for sync status
455 set_status("fetching sync status from peers");
456 yield {
457 // query data sync status from each sync peer
458 rgw_http_param_pair params[] = {
459 { "type", "bucket-index" },
460 { "status", nullptr },
461 { "bucket", bucket_instance.c_str() },
462 { "source-zone", zone_id.c_str() },
463 { nullptr, nullptr }
464 };
465
466 auto p = peer_status.begin();
467 for (auto& c : store->svc.zone->get_zone_conn_map()) {
468 using StatusCR = RGWReadRESTResourceCR<StatusShards>;
469 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
470 false);
471 ++p;
472 }
473 // in parallel, read the local bucket instance info
474 spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
475 bucket_instance, &bucket_info),
476 false);
477 }
478 // wait for a response from each peer. all must respond to attempt trim
479 while (num_spawned()) {
480 yield wait_for_child();
481 collect(&child_ret, nullptr);
482 if (child_ret < 0) {
483 drain_all();
484 return set_cr_error(child_ret);
485 }
486 }
487
488 // determine the minimum marker for each shard
489 retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
490 &min_markers);
491 if (retcode < 0) {
492 ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
493 return set_cr_error(retcode);
494 }
495
496 // trim shards with a ShardCollectCR
497 ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
498 << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
499 set_status("trimming bilog shards");
500 yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
501 // ENODATA just means there were no keys to trim
502 if (retcode == -ENODATA) {
503 retcode = 0;
504 }
505 if (retcode < 0) {
506 ldout(cct, 4) << "failed to trim bilog shards: "
507 << cpp_strerror(retcode) << dendl;
508 return set_cr_error(retcode);
509 }
510
511 observer->on_bucket_trimmed(std::move(bucket_instance));
512 return set_cr_done();
513 }
514 return 0;
515 }
516
517 /// trim each bucket instance while limiting the number of concurrent operations
518 class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
519 RGWRados *const store;
520 RGWHTTPManager *const http;
521 BucketTrimObserver *const observer;
522 std::vector<std::string>::const_iterator bucket;
523 std::vector<std::string>::const_iterator end;
524 public:
525 BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http,
526 BucketTrimObserver *observer,
527 const std::vector<std::string>& buckets,
528 int max_concurrent)
529 : RGWShardCollectCR(store->ctx(), max_concurrent),
530 store(store), http(http), observer(observer),
531 bucket(buckets.begin()), end(buckets.end())
532 {}
533 bool spawn_next() override;
534 };
535
536 bool BucketTrimInstanceCollectCR::spawn_next()
537 {
538 if (bucket == end) {
539 return false;
540 }
541 spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
542 ++bucket;
543 return true;
544 }
545
546 /// correlate the replies from each peer gateway into the given counter
547 int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
548 {
549 counter.clear();
550
551 try {
552 // decode notify responses
553 auto p = bl.cbegin();
554 std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
555 std::set<std::pair<uint64_t, uint64_t>> timeouts;
556 decode(replies, p);
557 decode(timeouts, p);
558
559 for (auto& peer : replies) {
560 auto q = peer.second.cbegin();
561 TrimCounters::Response response;
562 decode(response, q);
563 for (const auto& b : response.bucket_counters) {
564 counter.insert(b.bucket, b.count);
565 }
566 }
567 } catch (const buffer::error& e) {
568 return -EIO;
569 }
570 return 0;
571 }
572
573 /// metadata callback has the signature bool(string&& key, string&& marker)
574 using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
575
576 /// lists metadata keys, passing each to a callback until it returns false.
577 /// on reaching the end, it will restart at the beginning and list up to the
578 /// initial marker
579 class AsyncMetadataList : public RGWAsyncRadosRequest {
580 CephContext *const cct;
581 RGWMetadataManager *const mgr;
582 const std::string section;
583 const std::string start_marker;
584 MetadataListCallback callback;
585
586 int _send_request() override;
587 public:
588 AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
589 RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
590 const std::string& section, const std::string& start_marker,
591 const MetadataListCallback& callback)
592 : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
593 section(section), start_marker(start_marker), callback(callback)
594 {}
595 };
596
597 int AsyncMetadataList::_send_request()
598 {
599 void* handle = nullptr;
600 std::list<std::string> keys;
601 bool truncated{false};
602 std::string marker;
603
604 // start a listing at the given marker
605 int r = mgr->list_keys_init(section, start_marker, &handle);
606 if (r == -EINVAL) {
607 // restart with empty marker below
608 } else if (r < 0) {
609 ldout(cct, 10) << "failed to init metadata listing: "
610 << cpp_strerror(r) << dendl;
611 return r;
612 } else {
613 ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
614
615 // release the handle when scope exits
616 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
617
618 do {
619 // get the next key and marker
620 r = mgr->list_keys_next(handle, 1, keys, &truncated);
621 if (r < 0) {
622 ldout(cct, 10) << "failed to list metadata: "
623 << cpp_strerror(r) << dendl;
624 return r;
625 }
626 marker = mgr->get_marker(handle);
627
628 if (!keys.empty()) {
629 ceph_assert(keys.size() == 1);
630 auto& key = keys.front();
631 if (!callback(std::move(key), std::move(marker))) {
632 return 0;
633 }
634 }
635 } while (truncated);
636
637 if (start_marker.empty()) {
638 // already listed all keys
639 return 0;
640 }
641 }
642
643 // restart the listing from the beginning (empty marker)
644 handle = nullptr;
645
646 r = mgr->list_keys_init(section, "", &handle);
647 if (r < 0) {
648 ldout(cct, 10) << "failed to restart metadata listing: "
649 << cpp_strerror(r) << dendl;
650 return r;
651 }
652 ldout(cct, 20) << "restarting metadata listing" << dendl;
653
654 // release the handle when scope exits
655 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
656 do {
657 // get the next key and marker
658 r = mgr->list_keys_next(handle, 1, keys, &truncated);
659 if (r < 0) {
660 ldout(cct, 10) << "failed to list metadata: "
661 << cpp_strerror(r) << dendl;
662 return r;
663 }
664 marker = mgr->get_marker(handle);
665
666 if (!keys.empty()) {
667 ceph_assert(keys.size() == 1);
668 auto& key = keys.front();
669 // stop at original marker
670 if (marker >= start_marker) {
671 return 0;
672 }
673 if (!callback(std::move(key), std::move(marker))) {
674 return 0;
675 }
676 }
677 } while (truncated);
678
679 return 0;
680 }
681
682 /// coroutine wrapper for AsyncMetadataList
683 class MetadataListCR : public RGWSimpleCoroutine {
684 RGWAsyncRadosProcessor *const async_rados;
685 RGWMetadataManager *const mgr;
686 const std::string& section;
687 const std::string& start_marker;
688 MetadataListCallback callback;
689 RGWAsyncRadosRequest *req{nullptr};
690 public:
691 MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
692 RGWMetadataManager *mgr, const std::string& section,
693 const std::string& start_marker,
694 const MetadataListCallback& callback)
695 : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
696 section(section), start_marker(start_marker), callback(callback)
697 {}
698 ~MetadataListCR() override {
699 request_cleanup();
700 }
701
702 int send_request() override {
703 req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
704 mgr, section, start_marker, callback);
705 async_rados->queue(req);
706 return 0;
707 }
708 int request_complete() override {
709 return req->get_ret_status();
710 }
711 void request_cleanup() override {
712 if (req) {
713 req->finish();
714 req = nullptr;
715 }
716 }
717 };
718
719 class BucketTrimCR : public RGWCoroutine {
720 RGWRados *const store;
721 RGWHTTPManager *const http;
722 const BucketTrimConfig& config;
723 BucketTrimObserver *const observer;
724 const rgw_raw_obj& obj;
725 ceph::mono_time start_time;
726 bufferlist notify_replies;
727 BucketChangeCounter counter;
728 std::vector<std::string> buckets; //< buckets selected for trim
729 BucketTrimStatus status;
730 RGWObjVersionTracker objv; //< version tracker for trim status object
731 std::string last_cold_marker; //< position for next trim marker
732
733 static const std::string section; //< metadata section for bucket instances
734 public:
735 BucketTrimCR(RGWRados *store, RGWHTTPManager *http,
736 const BucketTrimConfig& config, BucketTrimObserver *observer,
737 const rgw_raw_obj& obj)
738 : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
739 observer(observer), obj(obj), counter(config.counter_size)
740 {}
741
742 int operate() override;
743 };
744
745 const std::string BucketTrimCR::section{"bucket.instance"};
746
747 int BucketTrimCR::operate()
748 {
749 reenter(this) {
750 start_time = ceph::mono_clock::now();
751
752 if (config.buckets_per_interval) {
753 // query watch/notify for hot buckets
754 ldout(cct, 10) << "fetching active bucket counters" << dendl;
755 set_status("fetching active bucket counters");
756 yield {
757 // request the top bucket counters from each peer gateway
758 const TrimNotifyType type = NotifyTrimCounters;
759 TrimCounters::Request request{32};
760 bufferlist bl;
761 encode(type, bl);
762 encode(request, bl);
763 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
764 &notify_replies));
765 }
766 if (retcode < 0) {
767 ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
768 return set_cr_error(retcode);
769 }
770
771 // select the hottest buckets for trim
772 retcode = accumulate_peer_counters(notify_replies, counter);
773 if (retcode < 0) {
774 ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
775 return set_cr_error(retcode);
776 }
777 buckets.reserve(config.buckets_per_interval);
778
779 const int max_count = config.buckets_per_interval -
780 config.min_cold_buckets_per_interval;
781 counter.get_highest(max_count,
782 [this] (const std::string& bucket, int count) {
783 buckets.push_back(bucket);
784 });
785 }
786
787 if (buckets.size() < config.buckets_per_interval) {
788 // read BucketTrimStatus for marker position
789 set_status("reading trim status");
790 using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
791 yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj,
792 &status, true, &objv));
793 if (retcode < 0) {
794 ldout(cct, 10) << "failed to read bilog trim status: "
795 << cpp_strerror(retcode) << dendl;
796 return set_cr_error(retcode);
797 }
798 if (status.marker == "MAX") {
799 status.marker.clear(); // restart at the beginning
800 }
801 ldout(cct, 10) << "listing cold buckets from marker="
802 << status.marker << dendl;
803
804 set_status("listing cold buckets for trim");
805 yield {
806 // capture a reference so 'this' remains valid in the callback
807 auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
808 // list cold buckets to consider for trim
809 auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
810 // filter out keys that we trimmed recently
811 if (observer->trimmed_recently(bucket)) {
812 return true;
813 }
814 // filter out active buckets that we've already selected
815 auto i = std::find(buckets.begin(), buckets.end(), bucket);
816 if (i != buckets.end()) {
817 return true;
818 }
819 buckets.emplace_back(std::move(bucket));
820 // remember the last cold bucket spawned to update the status marker
821 last_cold_marker = std::move(marker);
822 // return true if there's room for more
823 return buckets.size() < config.buckets_per_interval;
824 };
825
826 call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
827 section, status.marker, cb));
828 }
829 if (retcode < 0) {
830 ldout(cct, 4) << "failed to list bucket instance metadata: "
831 << cpp_strerror(retcode) << dendl;
832 return set_cr_error(retcode);
833 }
834 }
835
836 // trim bucket instances with limited concurrency
837 set_status("trimming buckets");
838 ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
839 yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
840 config.concurrent_buckets));
841 // ignore errors from individual buckets
842
843 // write updated trim status
844 if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
845 set_status("writing updated trim status");
846 status.marker = std::move(last_cold_marker);
847 ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
848 using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
849 yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj,
850 status, &objv));
851 if (retcode < 0) {
852 ldout(cct, 4) << "failed to write updated trim status: "
853 << cpp_strerror(retcode) << dendl;
854 return set_cr_error(retcode);
855 }
856 }
857
858 // notify peers that trim completed
859 set_status("trim completed");
860 yield {
861 const TrimNotifyType type = NotifyTrimComplete;
862 TrimComplete::Request request;
863 bufferlist bl;
864 encode(type, bl);
865 encode(request, bl);
866 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
867 nullptr));
868 }
869 if (retcode < 0) {
870 ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
871 return set_cr_error(retcode);
872 }
873
874 ldout(cct, 4) << "bucket index log processing completed in "
875 << ceph::mono_clock::now() - start_time << dendl;
876 return set_cr_done();
877 }
878 return 0;
879 }
880
881 class BucketTrimPollCR : public RGWCoroutine {
882 RGWRados *const store;
883 RGWHTTPManager *const http;
884 const BucketTrimConfig& config;
885 BucketTrimObserver *const observer;
886 const rgw_raw_obj& obj;
887 const std::string name{"trim"}; //< lock name
888 const std::string cookie;
889
890 public:
891 BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http,
892 const BucketTrimConfig& config,
893 BucketTrimObserver *observer, const rgw_raw_obj& obj)
894 : RGWCoroutine(store->ctx()), store(store), http(http),
895 config(config), observer(observer), obj(obj),
896 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
897 {}
898
899 int operate() override;
900 };
901
902 int BucketTrimPollCR::operate()
903 {
904 reenter(this) {
905 for (;;) {
906 set_status("sleeping");
907 wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
908
909 // prevent others from trimming for our entire wait interval
910 set_status("acquiring trim lock");
911 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
912 obj, name, cookie,
913 config.trim_interval_sec));
914 if (retcode < 0) {
915 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
916 continue;
917 }
918
919 set_status("trimming");
920 yield call(new BucketTrimCR(store, http, config, observer, obj));
921 if (retcode < 0) {
922 // on errors, unlock so other gateways can try
923 set_status("unlocking");
924 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
925 obj, name, cookie));
926 }
927 }
928 }
929 return 0;
930 }
931
932 /// tracks a bounded list of events with timestamps. old events can be expired,
933 /// and recent events can be searched by key. expiration depends on events being
934 /// inserted in temporal order
935 template <typename T, typename Clock = ceph::coarse_mono_clock>
936 class RecentEventList {
937 public:
938 using clock_type = Clock;
939 using time_point = typename clock_type::time_point;
940
941 RecentEventList(size_t max_size, const ceph::timespan& max_duration)
942 : events(max_size), max_duration(max_duration)
943 {}
944
945 /// insert an event at the given point in time. this time must be at least as
946 /// recent as the last inserted event
947 void insert(T&& value, const time_point& now) {
948 // ceph_assert(events.empty() || now >= events.back().time)
949 events.push_back(Event{std::move(value), now});
950 }
951
952 /// performs a linear search for an event matching the given key, whose type
953 /// U can be any that provides operator==(U, T)
954 template <typename U>
955 bool lookup(const U& key) const {
956 for (const auto& event : events) {
957 if (key == event.value) {
958 return true;
959 }
960 }
961 return false;
962 }
963
964 /// remove events that are no longer recent compared to the given point in time
965 void expire_old(const time_point& now) {
966 const auto expired_before = now - max_duration;
967 while (!events.empty() && events.front().time < expired_before) {
968 events.pop_front();
969 }
970 }
971
972 private:
973 struct Event {
974 T value;
975 time_point time;
976 };
977 boost::circular_buffer<Event> events;
978 const ceph::timespan max_duration;
979 };
980
981 namespace rgw {
982
983 // read bucket trim configuration from ceph context
984 void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
985 {
986 const auto& conf = cct->_conf;
987
988 config.trim_interval_sec =
989 conf.get_val<int64_t>("rgw_sync_log_trim_interval");
990 config.counter_size = 512;
991 config.buckets_per_interval =
992 conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
993 config.min_cold_buckets_per_interval =
994 conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
995 config.concurrent_buckets =
996 conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
997 config.notify_timeout_ms = 10000;
998 config.recent_size = 128;
999 config.recent_duration = std::chrono::hours(2);
1000 }
1001
1002 class BucketTrimManager::Impl : public TrimCounters::Server,
1003 public BucketTrimObserver {
1004 public:
1005 RGWRados *const store;
1006 const BucketTrimConfig config;
1007
1008 const rgw_raw_obj status_obj;
1009
1010 /// count frequency of bucket instance entries in the data changes log
1011 BucketChangeCounter counter;
1012
1013 using RecentlyTrimmedBucketList = RecentEventList<std::string>;
1014 using clock_type = RecentlyTrimmedBucketList::clock_type;
1015 /// track recently trimmed buckets to focus trim activity elsewhere
1016 RecentlyTrimmedBucketList trimmed;
1017
1018 /// serve the bucket trim watch/notify api
1019 BucketTrimWatcher watcher;
1020
1021 /// protect data shared between data sync, trim, and watch/notify threads
1022 std::mutex mutex;
1023
1024 Impl(RGWRados *store, const BucketTrimConfig& config)
1025 : store(store), config(config),
1026 status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid),
1027 counter(config.counter_size),
1028 trimmed(config.recent_size, config.recent_duration),
1029 watcher(store, status_obj, this)
1030 {}
1031
1032 /// TrimCounters::Server interface for watch/notify api
1033 void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
1034 buckets.reserve(count);
1035 std::lock_guard<std::mutex> lock(mutex);
1036 counter.get_highest(count, [&buckets] (const std::string& key, int count) {
1037 buckets.emplace_back(key, count);
1038 });
1039 ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
1040 }
1041
1042 void reset_bucket_counters() override {
1043 ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
1044 std::lock_guard<std::mutex> lock(mutex);
1045 counter.clear();
1046 trimmed.expire_old(clock_type::now());
1047 }
1048
1049 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1050 void on_bucket_trimmed(std::string&& bucket_instance) override {
1051 ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
1052 std::lock_guard<std::mutex> lock(mutex);
1053 trimmed.insert(std::move(bucket_instance), clock_type::now());
1054 }
1055
1056 bool trimmed_recently(const boost::string_view& bucket_instance) override {
1057 std::lock_guard<std::mutex> lock(mutex);
1058 return trimmed.lookup(bucket_instance);
1059 }
1060 };
1061
1062 BucketTrimManager::BucketTrimManager(RGWRados *store,
1063 const BucketTrimConfig& config)
1064 : impl(new Impl(store, config))
1065 {
1066 }
1067 BucketTrimManager::~BucketTrimManager() = default;
1068
1069 int BucketTrimManager::init()
1070 {
1071 return impl->watcher.start();
1072 }
1073
1074 void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
1075 {
1076 std::lock_guard<std::mutex> lock(impl->mutex);
1077 // filter recently trimmed bucket instances out of bucket change counter
1078 if (impl->trimmed.lookup(bucket)) {
1079 return;
1080 }
1081 impl->counter.insert(bucket.to_string());
1082 }
1083
1084 RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
1085 {
1086 return new BucketTrimPollCR(impl->store, http, impl->config,
1087 impl.get(), impl->status_obj);
1088 }
1089
1090 RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
1091 {
1092 // return the trim coroutine without any polling
1093 return new BucketTrimCR(impl->store, http, impl->config,
1094 impl.get(), impl->status_obj);
1095 }
1096
1097 } // namespace rgw