]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_trim_bilog.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_trim_bilog.cc
CommitLineData
b32b8144 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
b32b8144
FG
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2017 Red Hat, Inc
8 *
9 * Author: Casey Bodley <cbodley@redhat.com>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 */
16
17#include <mutex>
18#include <boost/circular_buffer.hpp>
19#include <boost/container/flat_map.hpp>
20
f64942e4 21#include "include/scope_guard.h"
b32b8144
FG
22#include "common/bounded_key_counter.h"
23#include "common/errno.h"
9f95a23c 24#include "rgw_trim_bilog.h"
b32b8144
FG
25#include "rgw_cr_rados.h"
26#include "rgw_cr_rest.h"
9f95a23c 27#include "rgw_cr_tools.h"
b32b8144
FG
28#include "rgw_data_sync.h"
29#include "rgw_metadata.h"
9f95a23c 30#include "rgw_sal.h"
11fdf7f2 31#include "rgw_zone.h"
b32b8144 32#include "rgw_sync.h"
9f95a23c 33#include "rgw_bucket.h"
b32b8144 34
11fdf7f2 35#include "services/svc_zone.h"
9f95a23c 36#include "services/svc_meta.h"
11fdf7f2 37
b32b8144 38#include <boost/asio/yield.hpp>
11fdf7f2 39#include "include/ceph_assert.h"
b32b8144
FG
40
41#define dout_subsys ceph_subsys_rgw
42
43#undef dout_prefix
44#define dout_prefix (*_dout << "trim: ")
45
20effc67
TL
46using namespace std;
47
b32b8144
FG
48using rgw::BucketTrimConfig;
49using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
50
51const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
52using rgw::BucketTrimStatus;
53
54
55// watch/notify api for gateways to coordinate about which buckets to trim
56enum TrimNotifyType {
57 NotifyTrimCounters = 0,
58 NotifyTrimComplete,
59};
60WRITE_RAW_ENCODER(TrimNotifyType);
61
62struct TrimNotifyHandler {
63 virtual ~TrimNotifyHandler() = default;
64
11fdf7f2 65 virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
b32b8144
FG
66};
67
68/// api to share the bucket trim counters between gateways in the same zone.
69/// each gateway will process different datalog shards, so the gateway that runs
70/// the trim process needs to accumulate their counters
71struct TrimCounters {
72 /// counter for a single bucket
73 struct BucketCounter {
74 std::string bucket; //< bucket instance metadata key
75 int count{0};
76
77 BucketCounter() = default;
78 BucketCounter(const std::string& bucket, int count)
79 : bucket(bucket), count(count) {}
80
81 void encode(bufferlist& bl) const;
11fdf7f2 82 void decode(bufferlist::const_iterator& p);
b32b8144
FG
83 };
84 using Vector = std::vector<BucketCounter>;
85
86 /// request bucket trim counters from peer gateways
87 struct Request {
88 uint16_t max_buckets; //< maximum number of bucket counters to return
89
90 void encode(bufferlist& bl) const;
11fdf7f2 91 void decode(bufferlist::const_iterator& p);
b32b8144
FG
92 };
93
94 /// return the current bucket trim counters
95 struct Response {
96 Vector bucket_counters;
97
98 void encode(bufferlist& bl) const;
11fdf7f2 99 void decode(bufferlist::const_iterator& p);
b32b8144
FG
100 };
101
102 /// server interface to query the hottest buckets
103 struct Server {
104 virtual ~Server() = default;
105
106 virtual void get_bucket_counters(int count, Vector& counters) = 0;
107 virtual void reset_bucket_counters() = 0;
108 };
109
110 /// notify handler
111 class Handler : public TrimNotifyHandler {
112 Server *const server;
113 public:
11fdf7f2 114 explicit Handler(Server *server) : server(server) {}
b32b8144 115
11fdf7f2 116 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
b32b8144
FG
117 };
118};
119std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
120{
121 return out << rhs.bucket << ":" << rhs.count;
122}
123
124void TrimCounters::BucketCounter::encode(bufferlist& bl) const
125{
11fdf7f2 126 using ceph::encode;
b32b8144 127 // no versioning to save space
11fdf7f2
TL
128 encode(bucket, bl);
129 encode(count, bl);
b32b8144 130}
11fdf7f2 131void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
b32b8144 132{
11fdf7f2
TL
133 using ceph::decode;
134 decode(bucket, p);
135 decode(count, p);
b32b8144
FG
136}
137WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
138
139void TrimCounters::Request::encode(bufferlist& bl) const
140{
141 ENCODE_START(1, 1, bl);
11fdf7f2 142 encode(max_buckets, bl);
b32b8144
FG
143 ENCODE_FINISH(bl);
144}
11fdf7f2 145void TrimCounters::Request::decode(bufferlist::const_iterator& p)
b32b8144
FG
146{
147 DECODE_START(1, p);
11fdf7f2 148 decode(max_buckets, p);
b32b8144
FG
149 DECODE_FINISH(p);
150}
151WRITE_CLASS_ENCODER(TrimCounters::Request);
152
153void TrimCounters::Response::encode(bufferlist& bl) const
154{
155 ENCODE_START(1, 1, bl);
11fdf7f2 156 encode(bucket_counters, bl);
b32b8144
FG
157 ENCODE_FINISH(bl);
158}
11fdf7f2 159void TrimCounters::Response::decode(bufferlist::const_iterator& p)
b32b8144
FG
160{
161 DECODE_START(1, p);
11fdf7f2 162 decode(bucket_counters, p);
b32b8144
FG
163 DECODE_FINISH(p);
164}
165WRITE_CLASS_ENCODER(TrimCounters::Response);
166
11fdf7f2 167void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
b32b8144
FG
168 bufferlist& output)
169{
170 Request request;
11fdf7f2 171 decode(request, input);
b32b8144
FG
172 auto count = std::min<uint16_t>(request.max_buckets, 128);
173
174 Response response;
175 server->get_bucket_counters(count, response.bucket_counters);
11fdf7f2 176 encode(response, output);
b32b8144
FG
177}
178
179/// api to notify peer gateways that trim has completed and their bucket change
180/// counters can be reset
181struct TrimComplete {
182 struct Request {
183 void encode(bufferlist& bl) const;
11fdf7f2 184 void decode(bufferlist::const_iterator& p);
b32b8144
FG
185 };
186 struct Response {
187 void encode(bufferlist& bl) const;
11fdf7f2 188 void decode(bufferlist::const_iterator& p);
b32b8144
FG
189 };
190
191 /// server interface to reset bucket counters
192 using Server = TrimCounters::Server;
193
194 /// notify handler
195 class Handler : public TrimNotifyHandler {
196 Server *const server;
197 public:
11fdf7f2 198 explicit Handler(Server *server) : server(server) {}
b32b8144 199
11fdf7f2 200 void handle(bufferlist::const_iterator& input, bufferlist& output) override;
b32b8144
FG
201 };
202};
203
204void TrimComplete::Request::encode(bufferlist& bl) const
205{
206 ENCODE_START(1, 1, bl);
207 ENCODE_FINISH(bl);
208}
11fdf7f2 209void TrimComplete::Request::decode(bufferlist::const_iterator& p)
b32b8144
FG
210{
211 DECODE_START(1, p);
212 DECODE_FINISH(p);
213}
214WRITE_CLASS_ENCODER(TrimComplete::Request);
215
216void TrimComplete::Response::encode(bufferlist& bl) const
217{
218 ENCODE_START(1, 1, bl);
219 ENCODE_FINISH(bl);
220}
11fdf7f2 221void TrimComplete::Response::decode(bufferlist::const_iterator& p)
b32b8144
FG
222{
223 DECODE_START(1, p);
224 DECODE_FINISH(p);
225}
226WRITE_CLASS_ENCODER(TrimComplete::Response);
227
11fdf7f2 228void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
b32b8144
FG
229 bufferlist& output)
230{
231 Request request;
11fdf7f2 232 decode(request, input);
b32b8144
FG
233
234 server->reset_bucket_counters();
235
236 Response response;
11fdf7f2 237 encode(response, output);
b32b8144
FG
238}
239
240
241/// rados watcher for bucket trim notifications
242class BucketTrimWatcher : public librados::WatchCtx2 {
20effc67 243 rgw::sal::RadosStore* const store;
b32b8144
FG
244 const rgw_raw_obj& obj;
245 rgw_rados_ref ref;
246 uint64_t handle{0};
247
248 using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
249 boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
250
251 public:
20effc67 252 BucketTrimWatcher(rgw::sal::RadosStore* store, const rgw_raw_obj& obj,
b32b8144
FG
253 TrimCounters::Server *counters)
254 : store(store), obj(obj) {
255 handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
256 handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
257 }
258
259 ~BucketTrimWatcher() {
260 stop();
261 }
262
b3b6e05e
TL
263 int start(const DoutPrefixProvider *dpp) {
264 int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
b32b8144
FG
265 if (r < 0) {
266 return r;
267 }
268
269 // register a watch on the realm's control object
9f95a23c 270 r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
b32b8144
FG
271 if (r == -ENOENT) {
272 constexpr bool exclusive = true;
9f95a23c 273 r = ref.pool.ioctx().create(ref.obj.oid, exclusive);
b32b8144 274 if (r == -EEXIST || r == 0) {
9f95a23c 275 r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
b32b8144
FG
276 }
277 }
278 if (r < 0) {
b3b6e05e 279 ldpp_dout(dpp, -1) << "Failed to watch " << ref.obj
b32b8144 280 << " with " << cpp_strerror(-r) << dendl;
9f95a23c 281 ref.pool.ioctx().close();
b32b8144
FG
282 return r;
283 }
284
b3b6e05e 285 ldpp_dout(dpp, 10) << "Watching " << ref.obj.oid << dendl;
b32b8144
FG
286 return 0;
287 }
288
289 int restart() {
9f95a23c 290 int r = ref.pool.ioctx().unwatch2(handle);
b32b8144 291 if (r < 0) {
11fdf7f2 292 lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
b32b8144
FG
293 << " with " << cpp_strerror(-r) << dendl;
294 }
9f95a23c 295 r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
b32b8144 296 if (r < 0) {
11fdf7f2 297 lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
b32b8144 298 << " with " << cpp_strerror(-r) << dendl;
9f95a23c 299 ref.pool.ioctx().close();
b32b8144
FG
300 }
301 return r;
302 }
303
304 void stop() {
305 if (handle) {
9f95a23c
TL
306 ref.pool.ioctx().unwatch2(handle);
307 ref.pool.ioctx().close();
b32b8144
FG
308 }
309 }
310
311 /// respond to bucket trim notifications
312 void handle_notify(uint64_t notify_id, uint64_t cookie,
313 uint64_t notifier_id, bufferlist& bl) override {
314 if (cookie != handle) {
315 return;
316 }
317 bufferlist reply;
318 try {
11fdf7f2 319 auto p = bl.cbegin();
b32b8144 320 TrimNotifyType type;
11fdf7f2 321 decode(type, p);
b32b8144
FG
322
323 auto handler = handlers.find(type);
324 if (handler != handlers.end()) {
325 handler->second->handle(p, reply);
326 } else {
327 lderr(store->ctx()) << "no handler for notify type " << type << dendl;
328 }
329 } catch (const buffer::error& e) {
330 lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
331 }
9f95a23c 332 ref.pool.ioctx().notify_ack(ref.obj.oid, notify_id, cookie, reply);
b32b8144
FG
333 }
334
335 /// reestablish the watch if it gets disconnected
336 void handle_error(uint64_t cookie, int err) override {
337 if (cookie != handle) {
338 return;
339 }
340 if (err == -ENOTCONN) {
11fdf7f2 341 ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
b32b8144
FG
342 restart();
343 }
344 }
345};
346
347
348/// Interface to communicate with the trim manager about completed operations
349struct BucketTrimObserver {
350 virtual ~BucketTrimObserver() = default;
351
352 virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
f67539c2 353 virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0;
b32b8144
FG
354};
355
356/// populate the status with the minimum stable marker of each shard
357template <typename Iter>
358int take_min_status(CephContext *cct, Iter first, Iter last,
359 std::vector<std::string> *status)
360{
b32b8144 361 for (auto peer = first; peer != last; ++peer) {
eafe8130 362 if (peer->size() != status->size()) {
b32b8144
FG
363 // all peers must agree on the number of shards
364 return -EINVAL;
365 }
366 auto m = status->begin();
367 for (auto& shard : *peer) {
368 auto& marker = *m++;
9f95a23c
TL
369 // if no sync has started, we can safely trim everything
370 if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
b32b8144
FG
371 continue;
372 }
373 // always take the first marker, or any later marker that's smaller
374 if (peer == first || marker > shard.inc_marker.position) {
375 marker = std::move(shard.inc_marker.position);
376 }
377 }
378 }
379 return 0;
380}
381
382/// trim each bilog shard to the given marker, while limiting the number of
383/// concurrent requests
384class BucketTrimShardCollectCR : public RGWShardCollectCR {
385 static constexpr int MAX_CONCURRENT_SHARDS = 16;
b3b6e05e 386 const DoutPrefixProvider *dpp;
20effc67 387 rgw::sal::RadosStore* const store;
b32b8144
FG
388 const RGWBucketInfo& bucket_info;
389 const std::vector<std::string>& markers; //< shard markers to trim
390 size_t i{0}; //< index of current shard marker
391 public:
b3b6e05e 392 BucketTrimShardCollectCR(const DoutPrefixProvider *dpp,
20effc67 393 rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info,
b32b8144
FG
394 const std::vector<std::string>& markers)
395 : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
b3b6e05e 396 dpp(dpp), store(store), bucket_info(bucket_info), markers(markers)
b32b8144
FG
397 {}
398 bool spawn_next() override;
399};
400
401bool BucketTrimShardCollectCR::spawn_next()
402{
403 while (i < markers.size()) {
404 const auto& marker = markers[i];
405 const auto shard_id = i++;
406
407 // skip empty markers
408 if (!marker.empty()) {
b3b6e05e 409 ldpp_dout(dpp, 10) << "trimming bilog shard " << shard_id
b32b8144 410 << " of " << bucket_info.bucket << " at marker " << marker << dendl;
b3b6e05e 411 spawn(new RGWRadosBILogTrimCR(dpp, store, bucket_info, shard_id,
b32b8144
FG
412 std::string{}, marker),
413 false);
414 return true;
415 }
416 }
417 return false;
418}
419
420/// trim the bilog of all of the given bucket instance's shards
421class BucketTrimInstanceCR : public RGWCoroutine {
20effc67 422 rgw::sal::RadosStore* const store;
b32b8144
FG
423 RGWHTTPManager *const http;
424 BucketTrimObserver *const observer;
425 std::string bucket_instance;
9f95a23c
TL
426 rgw_bucket_get_sync_policy_params get_policy_params;
427 std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
428 rgw_bucket bucket;
b32b8144 429 const std::string& zone_id; //< my zone id
9f95a23c
TL
430 RGWBucketInfo _bucket_info;
431 const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
11fdf7f2 432 int child_ret = 0;
b3b6e05e 433 const DoutPrefixProvider *dpp;
b32b8144
FG
434
435 using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
436 std::vector<StatusShards> peer_status; //< sync status for each peer
437 std::vector<std::string> min_markers; //< min marker per shard
438
439 public:
20effc67 440 BucketTrimInstanceCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
b32b8144 441 BucketTrimObserver *observer,
b3b6e05e
TL
442 const std::string& bucket_instance,
443 const DoutPrefixProvider *dpp)
b32b8144
FG
444 : RGWCoroutine(store->ctx()), store(store),
445 http(http), observer(observer),
446 bucket_instance(bucket_instance),
b3b6e05e
TL
447 zone_id(store->svc()->zone->get_zone().id),
448 dpp(dpp) {
9f95a23c
TL
449 rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr);
450 source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
451 }
b32b8144 452
b3b6e05e 453 int operate(const DoutPrefixProvider *dpp) override;
b32b8144
FG
454};
455
b3b6e05e 456int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
b32b8144
FG
457{
458 reenter(this) {
b3b6e05e 459 ldpp_dout(dpp, 4) << "starting trim on bucket=" << bucket_instance << dendl;
b32b8144 460
9f95a23c
TL
461 get_policy_params.zone = zone_id;
462 get_policy_params.bucket = bucket;
463 yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(),
464 store,
465 get_policy_params,
b3b6e05e
TL
466 source_policy,
467 dpp));
9f95a23c
TL
468 if (retcode < 0) {
469 if (retcode != -ENOENT) {
b3b6e05e 470 ldpp_dout(dpp, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl;
9f95a23c
TL
471 }
472
473 return set_cr_error(retcode);
474 }
475
476 if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
477 opt_bucket_info) {
478 pbucket_info = &(*opt_bucket_info);
479 } else {
480 /* this shouldn't really happen */
481 return set_cr_error(-ENOENT);
482 }
483
b32b8144 484 // query peers for sync status
9f95a23c 485 set_status("fetching sync status from relevant peers");
b32b8144 486 yield {
9f95a23c
TL
487 const auto& all_dests = source_policy->policy_handler->get_all_dests();
488
489 vector<rgw_zone_id> zids;
490 rgw_zone_id last_zid;
491 for (auto& diter : all_dests) {
492 const auto& zid = diter.first;
493 if (zid == last_zid) {
494 continue;
495 }
496 last_zid = zid;
497 zids.push_back(zid);
498 }
499
500 peer_status.resize(zids.size());
501
502 auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
b32b8144
FG
503
504 auto p = peer_status.begin();
9f95a23c
TL
505 for (auto& zid : zids) {
506 // query data sync status from each sync peer
507 rgw_http_param_pair params[] = {
508 { "type", "bucket-index" },
509 { "status", nullptr },
510 { "options", "merge" },
511 { "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket
512 param is not provided */
513 { "source-zone", zone_id.c_str() },
514 { nullptr, nullptr }
515 };
516
517 auto ziter = zone_conn_map.find(zid);
518 if (ziter == zone_conn_map.end()) {
b3b6e05e 519 ldpp_dout(dpp, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;
9f95a23c
TL
520 return set_cr_error(-ECANCELED);
521 }
b32b8144 522 using StatusCR = RGWReadRESTResourceCR<StatusShards>;
9f95a23c 523 spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p),
b32b8144
FG
524 false);
525 ++p;
526 }
b32b8144
FG
527 }
528 // wait for a response from each peer. all must respond to attempt trim
529 while (num_spawned()) {
b32b8144
FG
530 yield wait_for_child();
531 collect(&child_ret, nullptr);
532 if (child_ret < 0) {
533 drain_all();
534 return set_cr_error(child_ret);
535 }
536 }
537
eafe8130
TL
538 // initialize each shard with the maximum marker, which is only used when
539 // there are no peers syncing from us
f67539c2 540 min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards),
eafe8130
TL
541 RGWSyncLogTrimCR::max_marker);
542
b32b8144
FG
543 // determine the minimum marker for each shard
544 retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
545 &min_markers);
546 if (retcode < 0) {
b3b6e05e 547 ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl;
b32b8144
FG
548 return set_cr_error(retcode);
549 }
550
551 // trim shards with a ShardCollectCR
b3b6e05e 552 ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
b32b8144
FG
553 << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
554 set_status("trimming bilog shards");
b3b6e05e 555 yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, min_markers));
b32b8144
FG
556 // ENODATA just means there were no keys to trim
557 if (retcode == -ENODATA) {
558 retcode = 0;
559 }
560 if (retcode < 0) {
b3b6e05e 561 ldpp_dout(dpp, 4) << "failed to trim bilog shards: "
b32b8144
FG
562 << cpp_strerror(retcode) << dendl;
563 return set_cr_error(retcode);
564 }
565
566 observer->on_bucket_trimmed(std::move(bucket_instance));
567 return set_cr_done();
568 }
569 return 0;
570}
571
572/// trim each bucket instance while limiting the number of concurrent operations
573class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
20effc67 574 rgw::sal::RadosStore* const store;
b32b8144
FG
575 RGWHTTPManager *const http;
576 BucketTrimObserver *const observer;
577 std::vector<std::string>::const_iterator bucket;
578 std::vector<std::string>::const_iterator end;
b3b6e05e 579 const DoutPrefixProvider *dpp;
b32b8144 580 public:
20effc67 581 BucketTrimInstanceCollectCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
b32b8144
FG
582 BucketTrimObserver *observer,
583 const std::vector<std::string>& buckets,
b3b6e05e
TL
584 int max_concurrent,
585 const DoutPrefixProvider *dpp)
b32b8144
FG
586 : RGWShardCollectCR(store->ctx(), max_concurrent),
587 store(store), http(http), observer(observer),
b3b6e05e
TL
588 bucket(buckets.begin()), end(buckets.end()),
589 dpp(dpp)
b32b8144
FG
590 {}
591 bool spawn_next() override;
592};
593
594bool BucketTrimInstanceCollectCR::spawn_next()
595{
596 if (bucket == end) {
597 return false;
598 }
b3b6e05e 599 spawn(new BucketTrimInstanceCR(store, http, observer, *bucket, dpp), false);
b32b8144
FG
600 ++bucket;
601 return true;
602}
603
604/// correlate the replies from each peer gateway into the given counter
605int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
606{
607 counter.clear();
608
609 try {
610 // decode notify responses
11fdf7f2 611 auto p = bl.cbegin();
b32b8144
FG
612 std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
613 std::set<std::pair<uint64_t, uint64_t>> timeouts;
11fdf7f2
TL
614 decode(replies, p);
615 decode(timeouts, p);
b32b8144
FG
616
617 for (auto& peer : replies) {
11fdf7f2 618 auto q = peer.second.cbegin();
b32b8144 619 TrimCounters::Response response;
11fdf7f2 620 decode(response, q);
b32b8144
FG
621 for (const auto& b : response.bucket_counters) {
622 counter.insert(b.bucket, b.count);
623 }
624 }
625 } catch (const buffer::error& e) {
626 return -EIO;
627 }
628 return 0;
629}
630
631/// metadata callback has the signature bool(string&& key, string&& marker)
632using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
633
634/// lists metadata keys, passing each to a callback until it returns false.
635/// on reaching the end, it will restart at the beginning and list up to the
636/// initial marker
637class AsyncMetadataList : public RGWAsyncRadosRequest {
638 CephContext *const cct;
639 RGWMetadataManager *const mgr;
640 const std::string section;
641 const std::string start_marker;
642 MetadataListCallback callback;
b32b8144 643
b3b6e05e 644 int _send_request(const DoutPrefixProvider *dpp) override;
b32b8144
FG
645 public:
646 AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
647 RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
648 const std::string& section, const std::string& start_marker,
649 const MetadataListCallback& callback)
650 : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
651 section(section), start_marker(start_marker), callback(callback)
652 {}
b32b8144
FG
653};
654
b3b6e05e 655int AsyncMetadataList::_send_request(const DoutPrefixProvider *dpp)
b32b8144 656{
f64942e4
AA
657 void* handle = nullptr;
658 std::list<std::string> keys;
659 bool truncated{false};
660 std::string marker;
661
b32b8144 662 // start a listing at the given marker
b3b6e05e 663 int r = mgr->list_keys_init(dpp, section, start_marker, &handle);
f64942e4
AA
664 if (r == -EINVAL) {
665 // restart with empty marker below
666 } else if (r < 0) {
b3b6e05e 667 ldpp_dout(dpp, 10) << "failed to init metadata listing: "
b32b8144
FG
668 << cpp_strerror(r) << dendl;
669 return r;
f64942e4 670 } else {
b3b6e05e 671 ldpp_dout(dpp, 20) << "starting metadata listing at " << start_marker << dendl;
f64942e4
AA
672
673 // release the handle when scope exits
674 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
675
676 do {
677 // get the next key and marker
20effc67 678 r = mgr->list_keys_next(dpp, handle, 1, keys, &truncated);
f64942e4 679 if (r < 0) {
b3b6e05e 680 ldpp_dout(dpp, 10) << "failed to list metadata: "
f64942e4
AA
681 << cpp_strerror(r) << dendl;
682 return r;
b32b8144 683 }
f64942e4
AA
684 marker = mgr->get_marker(handle);
685
686 if (!keys.empty()) {
687 ceph_assert(keys.size() == 1);
688 auto& key = keys.front();
689 if (!callback(std::move(key), std::move(marker))) {
690 return 0;
691 }
692 }
693 } while (truncated);
b32b8144 694
f64942e4
AA
695 if (start_marker.empty()) {
696 // already listed all keys
697 return 0;
698 }
b32b8144
FG
699 }
700
701 // restart the listing from the beginning (empty marker)
b32b8144
FG
702 handle = nullptr;
703
b3b6e05e 704 r = mgr->list_keys_init(dpp, section, "", &handle);
b32b8144 705 if (r < 0) {
b3b6e05e 706 ldpp_dout(dpp, 10) << "failed to restart metadata listing: "
b32b8144
FG
707 << cpp_strerror(r) << dendl;
708 return r;
709 }
b3b6e05e 710 ldpp_dout(dpp, 20) << "restarting metadata listing" << dendl;
b32b8144 711
f64942e4
AA
712 // release the handle when scope exits
713 auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
b32b8144
FG
714 do {
715 // get the next key and marker
20effc67 716 r = mgr->list_keys_next(dpp, handle, 1, keys, &truncated);
b32b8144 717 if (r < 0) {
b3b6e05e 718 ldpp_dout(dpp, 10) << "failed to list metadata: "
b32b8144
FG
719 << cpp_strerror(r) << dendl;
720 return r;
721 }
722 marker = mgr->get_marker(handle);
723
724 if (!keys.empty()) {
11fdf7f2 725 ceph_assert(keys.size() == 1);
b32b8144
FG
726 auto& key = keys.front();
727 // stop at original marker
eafe8130 728 if (marker > start_marker) {
b32b8144
FG
729 return 0;
730 }
731 if (!callback(std::move(key), std::move(marker))) {
732 return 0;
733 }
734 }
735 } while (truncated);
736
737 return 0;
738}
739
740/// coroutine wrapper for AsyncMetadataList
741class MetadataListCR : public RGWSimpleCoroutine {
742 RGWAsyncRadosProcessor *const async_rados;
743 RGWMetadataManager *const mgr;
744 const std::string& section;
745 const std::string& start_marker;
746 MetadataListCallback callback;
747 RGWAsyncRadosRequest *req{nullptr};
748 public:
749 MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
750 RGWMetadataManager *mgr, const std::string& section,
751 const std::string& start_marker,
752 const MetadataListCallback& callback)
753 : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
754 section(section), start_marker(start_marker), callback(callback)
755 {}
756 ~MetadataListCR() override {
757 request_cleanup();
758 }
759
b3b6e05e 760 int send_request(const DoutPrefixProvider *dpp) override {
b32b8144
FG
761 req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
762 mgr, section, start_marker, callback);
763 async_rados->queue(req);
764 return 0;
765 }
766 int request_complete() override {
767 return req->get_ret_status();
768 }
769 void request_cleanup() override {
770 if (req) {
771 req->finish();
772 req = nullptr;
773 }
774 }
775};
776
777class BucketTrimCR : public RGWCoroutine {
20effc67 778 rgw::sal::RadosStore* const store;
b32b8144
FG
779 RGWHTTPManager *const http;
780 const BucketTrimConfig& config;
781 BucketTrimObserver *const observer;
782 const rgw_raw_obj& obj;
783 ceph::mono_time start_time;
784 bufferlist notify_replies;
785 BucketChangeCounter counter;
786 std::vector<std::string> buckets; //< buckets selected for trim
787 BucketTrimStatus status;
788 RGWObjVersionTracker objv; //< version tracker for trim status object
789 std::string last_cold_marker; //< position for next trim marker
b3b6e05e 790 const DoutPrefixProvider *dpp;
b32b8144
FG
791
792 static const std::string section; //< metadata section for bucket instances
793 public:
20effc67 794 BucketTrimCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
b32b8144 795 const BucketTrimConfig& config, BucketTrimObserver *observer,
b3b6e05e 796 const rgw_raw_obj& obj, const DoutPrefixProvider *dpp)
b32b8144 797 : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
b3b6e05e 798 observer(observer), obj(obj), counter(config.counter_size), dpp(dpp)
b32b8144
FG
799 {}
800
b3b6e05e 801 int operate(const DoutPrefixProvider *dpp) override;
b32b8144
FG
802};
803
804const std::string BucketTrimCR::section{"bucket.instance"};
805
b3b6e05e 806int BucketTrimCR::operate(const DoutPrefixProvider *dpp)
b32b8144
FG
807{
808 reenter(this) {
809 start_time = ceph::mono_clock::now();
810
811 if (config.buckets_per_interval) {
812 // query watch/notify for hot buckets
b3b6e05e 813 ldpp_dout(dpp, 10) << "fetching active bucket counters" << dendl;
b32b8144
FG
814 set_status("fetching active bucket counters");
815 yield {
816 // request the top bucket counters from each peer gateway
817 const TrimNotifyType type = NotifyTrimCounters;
818 TrimCounters::Request request{32};
819 bufferlist bl;
11fdf7f2
TL
820 encode(type, bl);
821 encode(request, bl);
b32b8144
FG
822 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
823 &notify_replies));
824 }
825 if (retcode < 0) {
b3b6e05e 826 ldpp_dout(dpp, 10) << "failed to fetch peer bucket counters" << dendl;
b32b8144
FG
827 return set_cr_error(retcode);
828 }
829
830 // select the hottest buckets for trim
831 retcode = accumulate_peer_counters(notify_replies, counter);
832 if (retcode < 0) {
833 ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
834 return set_cr_error(retcode);
835 }
836 buckets.reserve(config.buckets_per_interval);
837
838 const int max_count = config.buckets_per_interval -
839 config.min_cold_buckets_per_interval;
840 counter.get_highest(max_count,
841 [this] (const std::string& bucket, int count) {
842 buckets.push_back(bucket);
843 });
844 }
845
846 if (buckets.size() < config.buckets_per_interval) {
847 // read BucketTrimStatus for marker position
848 set_status("reading trim status");
849 using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
b3b6e05e 850 yield call(new ReadStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
b32b8144
FG
851 &status, true, &objv));
852 if (retcode < 0) {
b3b6e05e 853 ldpp_dout(dpp, 10) << "failed to read bilog trim status: "
b32b8144
FG
854 << cpp_strerror(retcode) << dendl;
855 return set_cr_error(retcode);
856 }
857 if (status.marker == "MAX") {
858 status.marker.clear(); // restart at the beginning
859 }
b3b6e05e 860 ldpp_dout(dpp, 10) << "listing cold buckets from marker="
b32b8144
FG
861 << status.marker << dendl;
862
863 set_status("listing cold buckets for trim");
864 yield {
865 // capture a reference so 'this' remains valid in the callback
866 auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
867 // list cold buckets to consider for trim
868 auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
869 // filter out keys that we trimmed recently
870 if (observer->trimmed_recently(bucket)) {
871 return true;
872 }
873 // filter out active buckets that we've already selected
874 auto i = std::find(buckets.begin(), buckets.end(), bucket);
875 if (i != buckets.end()) {
876 return true;
877 }
878 buckets.emplace_back(std::move(bucket));
879 // remember the last cold bucket spawned to update the status marker
880 last_cold_marker = std::move(marker);
881 // return true if there's room for more
882 return buckets.size() < config.buckets_per_interval;
883 };
884
9f95a23c
TL
885 call(new MetadataListCR(cct, store->svc()->rados->get_async_processor(),
886 store->ctl()->meta.mgr,
b32b8144
FG
887 section, status.marker, cb));
888 }
889 if (retcode < 0) {
890 ldout(cct, 4) << "failed to list bucket instance metadata: "
891 << cpp_strerror(retcode) << dendl;
892 return set_cr_error(retcode);
893 }
894 }
895
896 // trim bucket instances with limited concurrency
897 set_status("trimming buckets");
b3b6e05e 898 ldpp_dout(dpp, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
b32b8144 899 yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
b3b6e05e 900 config.concurrent_buckets, dpp));
b32b8144
FG
901 // ignore errors from individual buckets
902
903 // write updated trim status
904 if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
905 set_status("writing updated trim status");
906 status.marker = std::move(last_cold_marker);
b3b6e05e 907 ldpp_dout(dpp, 20) << "writing bucket trim marker=" << status.marker << dendl;
b32b8144 908 using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
b3b6e05e 909 yield call(new WriteStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
b32b8144
FG
910 status, &objv));
911 if (retcode < 0) {
b3b6e05e 912 ldpp_dout(dpp, 4) << "failed to write updated trim status: "
b32b8144
FG
913 << cpp_strerror(retcode) << dendl;
914 return set_cr_error(retcode);
915 }
916 }
917
918 // notify peers that trim completed
919 set_status("trim completed");
920 yield {
921 const TrimNotifyType type = NotifyTrimComplete;
922 TrimComplete::Request request;
923 bufferlist bl;
11fdf7f2
TL
924 encode(type, bl);
925 encode(request, bl);
b32b8144
FG
926 call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
927 nullptr));
928 }
929 if (retcode < 0) {
930 ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
931 return set_cr_error(retcode);
932 }
933
b3b6e05e 934 ldpp_dout(dpp, 4) << "bucket index log processing completed in "
b32b8144
FG
935 << ceph::mono_clock::now() - start_time << dendl;
936 return set_cr_done();
937 }
938 return 0;
939}
940
941class BucketTrimPollCR : public RGWCoroutine {
20effc67 942 rgw::sal::RadosStore* const store;
b32b8144
FG
943 RGWHTTPManager *const http;
944 const BucketTrimConfig& config;
945 BucketTrimObserver *const observer;
946 const rgw_raw_obj& obj;
947 const std::string name{"trim"}; //< lock name
948 const std::string cookie;
b3b6e05e 949 const DoutPrefixProvider *dpp;
b32b8144
FG
950
951 public:
20effc67 952 BucketTrimPollCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
b32b8144 953 const BucketTrimConfig& config,
b3b6e05e
TL
954 BucketTrimObserver *observer, const rgw_raw_obj& obj,
955 const DoutPrefixProvider *dpp)
b32b8144
FG
956 : RGWCoroutine(store->ctx()), store(store), http(http),
957 config(config), observer(observer), obj(obj),
b3b6e05e
TL
958 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
959 dpp(dpp) {}
b32b8144 960
b3b6e05e 961 int operate(const DoutPrefixProvider *dpp) override;
b32b8144
FG
962};
963
b3b6e05e 964int BucketTrimPollCR::operate(const DoutPrefixProvider *dpp)
b32b8144
FG
965{
966 reenter(this) {
967 for (;;) {
968 set_status("sleeping");
11fdf7f2 969 wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
b32b8144
FG
970
971 // prevent others from trimming for our entire wait interval
972 set_status("acquiring trim lock");
9f95a23c 973 yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
b32b8144
FG
974 obj, name, cookie,
975 config.trim_interval_sec));
976 if (retcode < 0) {
977 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
978 continue;
979 }
980
981 set_status("trimming");
b3b6e05e 982 yield call(new BucketTrimCR(store, http, config, observer, obj, dpp));
b32b8144
FG
983 if (retcode < 0) {
984 // on errors, unlock so other gateways can try
985 set_status("unlocking");
9f95a23c 986 yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
b32b8144
FG
987 obj, name, cookie));
988 }
989 }
990 }
991 return 0;
992}
993
994/// tracks a bounded list of events with timestamps. old events can be expired,
995/// and recent events can be searched by key. expiration depends on events being
996/// inserted in temporal order
997template <typename T, typename Clock = ceph::coarse_mono_clock>
998class RecentEventList {
999 public:
1000 using clock_type = Clock;
1001 using time_point = typename clock_type::time_point;
1002
1003 RecentEventList(size_t max_size, const ceph::timespan& max_duration)
1004 : events(max_size), max_duration(max_duration)
1005 {}
1006
1007 /// insert an event at the given point in time. this time must be at least as
1008 /// recent as the last inserted event
1009 void insert(T&& value, const time_point& now) {
11fdf7f2 1010 // ceph_assert(events.empty() || now >= events.back().time)
b32b8144
FG
1011 events.push_back(Event{std::move(value), now});
1012 }
1013
1014 /// performs a linear search for an event matching the given key, whose type
1015 /// U can be any that provides operator==(U, T)
1016 template <typename U>
1017 bool lookup(const U& key) const {
1018 for (const auto& event : events) {
1019 if (key == event.value) {
1020 return true;
1021 }
1022 }
1023 return false;
1024 }
1025
1026 /// remove events that are no longer recent compared to the given point in time
1027 void expire_old(const time_point& now) {
1028 const auto expired_before = now - max_duration;
1029 while (!events.empty() && events.front().time < expired_before) {
1030 events.pop_front();
1031 }
1032 }
1033
1034 private:
1035 struct Event {
1036 T value;
1037 time_point time;
1038 };
1039 boost::circular_buffer<Event> events;
1040 const ceph::timespan max_duration;
1041};
1042
1043namespace rgw {
1044
1045// read bucket trim configuration from ceph context
1046void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
1047{
11fdf7f2 1048 const auto& conf = cct->_conf;
b32b8144
FG
1049
1050 config.trim_interval_sec =
11fdf7f2 1051 conf.get_val<int64_t>("rgw_sync_log_trim_interval");
b32b8144
FG
1052 config.counter_size = 512;
1053 config.buckets_per_interval =
11fdf7f2 1054 conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
b32b8144 1055 config.min_cold_buckets_per_interval =
11fdf7f2 1056 conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
b32b8144 1057 config.concurrent_buckets =
11fdf7f2 1058 conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
b32b8144
FG
1059 config.notify_timeout_ms = 10000;
1060 config.recent_size = 128;
1061 config.recent_duration = std::chrono::hours(2);
1062}
1063
1064class BucketTrimManager::Impl : public TrimCounters::Server,
1065 public BucketTrimObserver {
1066 public:
20effc67 1067 rgw::sal::RadosStore* const store;
b32b8144
FG
1068 const BucketTrimConfig config;
1069
1070 const rgw_raw_obj status_obj;
1071
1072 /// count frequency of bucket instance entries in the data changes log
1073 BucketChangeCounter counter;
1074
1075 using RecentlyTrimmedBucketList = RecentEventList<std::string>;
1076 using clock_type = RecentlyTrimmedBucketList::clock_type;
1077 /// track recently trimmed buckets to focus trim activity elsewhere
1078 RecentlyTrimmedBucketList trimmed;
1079
1080 /// serve the bucket trim watch/notify api
1081 BucketTrimWatcher watcher;
1082
1083 /// protect data shared between data sync, trim, and watch/notify threads
1084 std::mutex mutex;
1085
20effc67 1086 Impl(rgw::sal::RadosStore* store, const BucketTrimConfig& config)
b32b8144 1087 : store(store), config(config),
20effc67 1088 status_obj(store->get_zone()->get_params().log_pool, BucketTrimStatus::oid),
b32b8144
FG
1089 counter(config.counter_size),
1090 trimmed(config.recent_size, config.recent_duration),
1091 watcher(store, status_obj, this)
1092 {}
1093
1094 /// TrimCounters::Server interface for watch/notify api
1095 void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
1096 buckets.reserve(count);
1097 std::lock_guard<std::mutex> lock(mutex);
1098 counter.get_highest(count, [&buckets] (const std::string& key, int count) {
1099 buckets.emplace_back(key, count);
1100 });
1101 ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
1102 }
1103
1104 void reset_bucket_counters() override {
1105 ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
1106 std::lock_guard<std::mutex> lock(mutex);
1107 counter.clear();
1108 trimmed.expire_old(clock_type::now());
1109 }
1110
1111 /// BucketTrimObserver interface to remember successfully-trimmed buckets
1112 void on_bucket_trimmed(std::string&& bucket_instance) override {
1113 ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
1114 std::lock_guard<std::mutex> lock(mutex);
1115 trimmed.insert(std::move(bucket_instance), clock_type::now());
1116 }
1117
f67539c2 1118 bool trimmed_recently(const std::string_view& bucket_instance) override {
b32b8144
FG
1119 std::lock_guard<std::mutex> lock(mutex);
1120 return trimmed.lookup(bucket_instance);
1121 }
1122};
1123
20effc67 1124BucketTrimManager::BucketTrimManager(rgw::sal::RadosStore* store,
b32b8144
FG
1125 const BucketTrimConfig& config)
1126 : impl(new Impl(store, config))
1127{
1128}
1129BucketTrimManager::~BucketTrimManager() = default;
1130
1131int BucketTrimManager::init()
1132{
b3b6e05e 1133 return impl->watcher.start(this);
b32b8144
FG
1134}
1135
f67539c2 1136void BucketTrimManager::on_bucket_changed(const std::string_view& bucket)
b32b8144
FG
1137{
1138 std::lock_guard<std::mutex> lock(impl->mutex);
1139 // filter recently trimmed bucket instances out of bucket change counter
1140 if (impl->trimmed.lookup(bucket)) {
1141 return;
1142 }
f67539c2 1143 impl->counter.insert(std::string(bucket));
b32b8144
FG
1144}
1145
1146RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
1147{
1148 return new BucketTrimPollCR(impl->store, http, impl->config,
b3b6e05e 1149 impl.get(), impl->status_obj, this);
b32b8144
FG
1150}
1151
1152RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
1153{
1154 // return the trim coroutine without any polling
1155 return new BucketTrimCR(impl->store, http, impl->config,
b3b6e05e
TL
1156 impl.get(), impl->status_obj, this);
1157}
1158
1159CephContext* BucketTrimManager::get_cct() const
1160{
1161 return impl->store->ctx();
1162}
1163
1164unsigned BucketTrimManager::get_subsys() const
1165{
1166 return dout_subsys;
1167}
1168
1169std::ostream& BucketTrimManager::gen_prefix(std::ostream& out) const
1170{
1171 return out << "rgw bucket trim manager: ";
b32b8144
FG
1172}
1173
1174} // namespace rgw