// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+
/*
* Ceph - scalable distributed file system
*
#include <boost/circular_buffer.hpp>
#include <boost/container/flat_map.hpp>
+#include "include/scope_guard.h"
#include "common/bounded_key_counter.h"
#include "common/errno.h"
#include "rgw_sync_log_trim.h"
#include "rgw_data_sync.h"
#include "rgw_metadata.h"
#include "rgw_rados.h"
+#include "rgw_zone.h"
#include "rgw_sync.h"
+#include "services/svc_zone.h"
+
#include <boost/asio/yield.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#define dout_subsys ceph_subsys_rgw
struct TrimNotifyHandler {
virtual ~TrimNotifyHandler() = default;
- virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
+ virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
};
/// api to share the bucket trim counters between gateways in the same zone.
: bucket(bucket), count(count) {}
void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& p);
+ void decode(bufferlist::const_iterator& p);
};
using Vector = std::vector<BucketCounter>;
uint16_t max_buckets; //< maximum number of bucket counters to return
void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& p);
+ void decode(bufferlist::const_iterator& p);
};
/// return the current bucket trim counters
Vector bucket_counters;
void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& p);
+ void decode(bufferlist::const_iterator& p);
};
/// server interface to query the hottest buckets
class Handler : public TrimNotifyHandler {
Server *const server;
public:
- Handler(Server *server) : server(server) {}
+ explicit Handler(Server *server) : server(server) {}
- void handle(bufferlist::iterator& input, bufferlist& output) override;
+ void handle(bufferlist::const_iterator& input, bufferlist& output) override;
};
};
std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
void TrimCounters::BucketCounter::encode(bufferlist& bl) const
{
+ using ceph::encode;
// no versioning to save space
- ::encode(bucket, bl);
- ::encode(count, bl);
+ encode(bucket, bl);
+ encode(count, bl);
}
-void TrimCounters::BucketCounter::decode(bufferlist::iterator& p)
+void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
{
- ::decode(bucket, p);
- ::decode(count, p);
+ using ceph::decode;
+ decode(bucket, p);
+ decode(count, p);
}
WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
void TrimCounters::Request::encode(bufferlist& bl) const
{
ENCODE_START(1, 1, bl);
- ::encode(max_buckets, bl);
+ encode(max_buckets, bl);
ENCODE_FINISH(bl);
}
-void TrimCounters::Request::decode(bufferlist::iterator& p)
+void TrimCounters::Request::decode(bufferlist::const_iterator& p)
{
DECODE_START(1, p);
- ::decode(max_buckets, p);
+ decode(max_buckets, p);
DECODE_FINISH(p);
}
WRITE_CLASS_ENCODER(TrimCounters::Request);
void TrimCounters::Response::encode(bufferlist& bl) const
{
ENCODE_START(1, 1, bl);
- ::encode(bucket_counters, bl);
+ encode(bucket_counters, bl);
ENCODE_FINISH(bl);
}
-void TrimCounters::Response::decode(bufferlist::iterator& p)
+void TrimCounters::Response::decode(bufferlist::const_iterator& p)
{
DECODE_START(1, p);
- ::decode(bucket_counters, p);
+ decode(bucket_counters, p);
DECODE_FINISH(p);
}
WRITE_CLASS_ENCODER(TrimCounters::Response);
-void TrimCounters::Handler::handle(bufferlist::iterator& input,
+void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
bufferlist& output)
{
Request request;
- ::decode(request, input);
+ decode(request, input);
auto count = std::min<uint16_t>(request.max_buckets, 128);
Response response;
server->get_bucket_counters(count, response.bucket_counters);
- ::encode(response, output);
+ encode(response, output);
}
/// api to notify peer gateways that trim has completed and their bucket change
struct TrimComplete {
struct Request {
void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& p);
+ void decode(bufferlist::const_iterator& p);
};
struct Response {
void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& p);
+ void decode(bufferlist::const_iterator& p);
};
/// server interface to reset bucket counters
class Handler : public TrimNotifyHandler {
Server *const server;
public:
- Handler(Server *server) : server(server) {}
+ explicit Handler(Server *server) : server(server) {}
- void handle(bufferlist::iterator& input, bufferlist& output) override;
+ void handle(bufferlist::const_iterator& input, bufferlist& output) override;
};
};
ENCODE_START(1, 1, bl);
ENCODE_FINISH(bl);
}
-void TrimComplete::Request::decode(bufferlist::iterator& p)
+void TrimComplete::Request::decode(bufferlist::const_iterator& p)
{
DECODE_START(1, p);
DECODE_FINISH(p);
ENCODE_START(1, 1, bl);
ENCODE_FINISH(bl);
}
-void TrimComplete::Response::decode(bufferlist::iterator& p)
+void TrimComplete::Response::decode(bufferlist::const_iterator& p)
{
DECODE_START(1, p);
DECODE_FINISH(p);
}
WRITE_CLASS_ENCODER(TrimComplete::Response);
-void TrimComplete::Handler::handle(bufferlist::iterator& input,
+void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
bufferlist& output)
{
Request request;
- ::decode(request, input);
+ decode(request, input);
server->reset_bucket_counters();
Response response;
- ::encode(response, output);
+ encode(response, output);
}
}
// register a watch on the realm's control object
- r = ref.ioctx.watch2(ref.oid, &handle, this);
+ r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
if (r == -ENOENT) {
constexpr bool exclusive = true;
- r = ref.ioctx.create(ref.oid, exclusive);
+ r = ref.ioctx.create(ref.obj.oid, exclusive);
if (r == -EEXIST || r == 0) {
- r = ref.ioctx.watch2(ref.oid, &handle, this);
+ r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
}
}
if (r < 0) {
- lderr(store->ctx()) << "Failed to watch " << ref.oid
+ lderr(store->ctx()) << "Failed to watch " << ref.obj
<< " with " << cpp_strerror(-r) << dendl;
ref.ioctx.close();
return r;
}
- ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl;
+ ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
return 0;
}
int restart() {
int r = ref.ioctx.unwatch2(handle);
if (r < 0) {
- lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
+ lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
<< " with " << cpp_strerror(-r) << dendl;
}
- r = ref.ioctx.watch2(ref.oid, &handle, this);
+ r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
if (r < 0) {
- lderr(store->ctx()) << "Failed to restart watch on " << ref.oid
+ lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
<< " with " << cpp_strerror(-r) << dendl;
ref.ioctx.close();
}
}
bufferlist reply;
try {
- auto p = bl.begin();
+ auto p = bl.cbegin();
TrimNotifyType type;
- ::decode(type, p);
+ decode(type, p);
auto handler = handlers.find(type);
if (handler != handlers.end()) {
} catch (const buffer::error& e) {
lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
}
- ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply);
+ ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply);
}
/// reestablish the watch if it gets disconnected
return;
}
if (err == -ENOTCONN) {
- ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl;
+ ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
restart();
}
}
int take_min_status(CephContext *cct, Iter first, Iter last,
std::vector<std::string> *status)
{
- status->clear();
- boost::optional<size_t> num_shards;
for (auto peer = first; peer != last; ++peer) {
- const size_t peer_shards = peer->size();
- if (!num_shards) {
- num_shards = peer_shards;
- status->resize(*num_shards);
- } else if (*num_shards != peer_shards) {
+ if (peer->size() != status->size()) {
// all peers must agree on the number of shards
return -EINVAL;
}
std::string bucket_instance;
const std::string& zone_id; //< my zone id
RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+ int child_ret = 0;
using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
std::vector<StatusShards> peer_status; //< sync status for each peer
: RGWCoroutine(store->ctx()), store(store),
http(http), observer(observer),
bucket_instance(bucket_instance),
- zone_id(store->get_zone().id),
- peer_status(store->zone_conn_map.size())
+ zone_id(store->svc.zone->get_zone().id),
+ peer_status(store->svc.zone->get_zone_data_notify_to_map().size())
{}
int operate() override;
};
auto p = peer_status.begin();
- for (auto& c : store->zone_conn_map) {
+ for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
using StatusCR = RGWReadRESTResourceCR<StatusShards>;
spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
false);
}
// wait for a response from each peer. all must respond to attempt trim
while (num_spawned()) {
- int child_ret;
yield wait_for_child();
collect(&child_ret, nullptr);
if (child_ret < 0) {
}
}
+ // initialize each shard with the maximum marker, which is only used when
+ // there are no peers syncing from us
+ min_markers.assign(std::max(1u, bucket_info.num_shards),
+ RGWSyncLogTrimCR::max_marker);
+
// determine the minimum marker for each shard
retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
&min_markers);
try {
// decode notify responses
- auto p = bl.begin();
+ auto p = bl.cbegin();
std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
std::set<std::pair<uint64_t, uint64_t>> timeouts;
- ::decode(replies, p);
- ::decode(timeouts, p);
+ decode(replies, p);
+ decode(timeouts, p);
for (auto& peer : replies) {
- auto q = peer.second.begin();
+ auto q = peer.second.cbegin();
TrimCounters::Response response;
- ::decode(response, q);
+ decode(response, q);
for (const auto& b : response.bucket_counters) {
counter.insert(b.bucket, b.count);
}
const std::string section;
const std::string start_marker;
MetadataListCallback callback;
- void *handle{nullptr};
int _send_request() override;
public:
: RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
section(section), start_marker(start_marker), callback(callback)
{}
- ~AsyncMetadataList() override {
- if (handle) {
- mgr->list_keys_complete(handle);
- }
- }
};
int AsyncMetadataList::_send_request()
{
+ void* handle = nullptr;
+ std::list<std::string> keys;
+ bool truncated{false};
+ std::string marker;
+
// start a listing at the given marker
int r = mgr->list_keys_init(section, start_marker, &handle);
- if (r < 0) {
+ if (r == -EINVAL) {
+ // restart with empty marker below
+ } else if (r < 0) {
ldout(cct, 10) << "failed to init metadata listing: "
<< cpp_strerror(r) << dendl;
return r;
- }
- ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
-
- std::list<std::string> keys;
- bool truncated{false};
- std::string marker;
-
- do {
- // get the next key and marker
- r = mgr->list_keys_next(handle, 1, keys, &truncated);
- if (r < 0) {
- ldout(cct, 10) << "failed to list metadata: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- marker = mgr->get_marker(handle);
-
- if (!keys.empty()) {
- assert(keys.size() == 1);
- auto& key = keys.front();
- if (!callback(std::move(key), std::move(marker))) {
- return 0;
+ } else {
+ ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
+
+ // release the handle when scope exits
+ auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
+
+ do {
+ // get the next key and marker
+ r = mgr->list_keys_next(handle, 1, keys, &truncated);
+ if (r < 0) {
+ ldout(cct, 10) << "failed to list metadata: "
+ << cpp_strerror(r) << dendl;
+ return r;
}
- }
- } while (truncated);
+ marker = mgr->get_marker(handle);
+
+ if (!keys.empty()) {
+ ceph_assert(keys.size() == 1);
+ auto& key = keys.front();
+ if (!callback(std::move(key), std::move(marker))) {
+ return 0;
+ }
+ }
+ } while (truncated);
- if (start_marker.empty()) {
- // already listed all keys
- return 0;
+ if (start_marker.empty()) {
+ // already listed all keys
+ return 0;
+ }
}
// restart the listing from the beginning (empty marker)
- mgr->list_keys_complete(handle);
handle = nullptr;
r = mgr->list_keys_init(section, "", &handle);
}
ldout(cct, 20) << "restarting metadata listing" << dendl;
+ // release the handle when scope exits
+ auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
do {
// get the next key and marker
r = mgr->list_keys_next(handle, 1, keys, &truncated);
marker = mgr->get_marker(handle);
if (!keys.empty()) {
- assert(keys.size() == 1);
+ ceph_assert(keys.size() == 1);
auto& key = keys.front();
// stop at original marker
- if (marker >= start_marker) {
+ if (marker > start_marker) {
return 0;
}
if (!callback(std::move(key), std::move(marker))) {
observer(observer), obj(obj), counter(config.counter_size)
{}
- int operate();
+ int operate() override;
};
const std::string BucketTrimCR::section{"bucket.instance"};
const TrimNotifyType type = NotifyTrimCounters;
TrimCounters::Request request{32};
bufferlist bl;
- ::encode(type, bl);
- ::encode(request, bl);
+ encode(type, bl);
+ encode(request, bl);
call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
¬ify_replies));
}
// read BucketTrimStatus for marker position
set_status("reading trim status");
using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
- yield call(new ReadStatus(store->get_async_rados(), store, obj,
+ yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj,
&status, true, &objv));
if (retcode < 0) {
ldout(cct, 10) << "failed to read bilog trim status: "
status.marker = std::move(last_cold_marker);
ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
- yield call(new WriteStatus(store->get_async_rados(), store, obj,
+ yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj,
status, &objv));
if (retcode < 0) {
ldout(cct, 4) << "failed to write updated trim status: "
const TrimNotifyType type = NotifyTrimComplete;
TrimComplete::Request request;
bufferlist bl;
- ::encode(type, bl);
- ::encode(request, bl);
+ encode(type, bl);
+ encode(request, bl);
call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
nullptr));
}
cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
{}
- int operate();
+ int operate() override;
};
int BucketTrimPollCR::operate()
reenter(this) {
for (;;) {
set_status("sleeping");
- wait(utime_t{config.trim_interval_sec, 0});
+ wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
// prevent others from trimming for our entire wait interval
set_status("acquiring trim lock");
/// insert an event at the given point in time. this time must be at least as
/// recent as the last inserted event
void insert(T&& value, const time_point& now) {
- // assert(events.empty() || now >= events.back().time)
+ // ceph_assert(events.empty() || now >= events.back().time)
events.push_back(Event{std::move(value), now});
}
// read bucket trim configuration from ceph context
void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
{
- auto conf = cct->_conf;
+ const auto& conf = cct->_conf;
config.trim_interval_sec =
- conf->get_val<int64_t>("rgw_sync_log_trim_interval");
+ conf.get_val<int64_t>("rgw_sync_log_trim_interval");
config.counter_size = 512;
config.buckets_per_interval =
- conf->get_val<int64_t>("rgw_sync_log_trim_max_buckets");
+ conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
config.min_cold_buckets_per_interval =
- conf->get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
+ conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
config.concurrent_buckets =
- conf->get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
+ conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
config.notify_timeout_ms = 10000;
config.recent_size = 128;
config.recent_duration = std::chrono::hours(2);
Impl(RGWRados *store, const BucketTrimConfig& config)
: store(store), config(config),
- status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid),
+ status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid),
counter(config.counter_size),
trimmed(config.recent_size, config.recent_duration),
watcher(store, status_obj, this)