]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_sync_log_trim.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_sync_log_trim.cc
index ffec32052370503df29b2503cf7eb61d95987229..a8a3fdee913b20a60a678cd820011da1c22a991e 100644 (file)
@@ -1,5 +1,6 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
+
 /*
  * Ceph - scalable distributed file system
  *
 #include "rgw_data_sync.h"
 #include "rgw_metadata.h"
 #include "rgw_rados.h"
+#include "rgw_zone.h"
 #include "rgw_sync.h"
 
+#include "services/svc_zone.h"
+
 #include <boost/asio/yield.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -53,7 +57,7 @@ WRITE_RAW_ENCODER(TrimNotifyType);
 struct TrimNotifyHandler {
   virtual ~TrimNotifyHandler() = default;
 
-  virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
+  virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
 };
 
 /// api to share the bucket trim counters between gateways in the same zone.
@@ -70,7 +74,7 @@ struct TrimCounters {
       : bucket(bucket), count(count) {}
 
     void encode(bufferlist& bl) const;
-    void decode(bufferlist::iterator& p);
+    void decode(bufferlist::const_iterator& p);
   };
   using Vector = std::vector<BucketCounter>;
 
@@ -79,7 +83,7 @@ struct TrimCounters {
     uint16_t max_buckets; //< maximum number of bucket counters to return
 
     void encode(bufferlist& bl) const;
-    void decode(bufferlist::iterator& p);
+    void decode(bufferlist::const_iterator& p);
   };
 
   /// return the current bucket trim counters
@@ -87,7 +91,7 @@ struct TrimCounters {
     Vector bucket_counters;
 
     void encode(bufferlist& bl) const;
-    void decode(bufferlist::iterator& p);
+    void decode(bufferlist::const_iterator& p);
   };
 
   /// server interface to query the hottest buckets
@@ -102,9 +106,9 @@ struct TrimCounters {
   class Handler : public TrimNotifyHandler {
     Server *const server;
    public:
-    Handler(Server *server) : server(server) {}
+    explicit Handler(Server *server) : server(server) {}
 
-    void handle(bufferlist::iterator& input, bufferlist& output) override;
+    void handle(bufferlist::const_iterator& input, bufferlist& output) override;
   };
 };
 std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
@@ -114,27 +118,29 @@ std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& r
 
 void TrimCounters::BucketCounter::encode(bufferlist& bl) const
 {
+  using ceph::encode;
   // no versioning to save space
-  ::encode(bucket, bl);
-  ::encode(count, bl);
+  encode(bucket, bl);
+  encode(count, bl);
 }
-void TrimCounters::BucketCounter::decode(bufferlist::iterator& p)
+void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
 {
-  ::decode(bucket, p);
-  ::decode(count, p);
+  using ceph::decode;
+  decode(bucket, p);
+  decode(count, p);
 }
 WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
 
 void TrimCounters::Request::encode(bufferlist& bl) const
 {
   ENCODE_START(1, 1, bl);
-  ::encode(max_buckets, bl);
+  encode(max_buckets, bl);
   ENCODE_FINISH(bl);
 }
-void TrimCounters::Request::decode(bufferlist::iterator& p)
+void TrimCounters::Request::decode(bufferlist::const_iterator& p)
 {
   DECODE_START(1, p);
-  ::decode(max_buckets, p);
+  decode(max_buckets, p);
   DECODE_FINISH(p);
 }
 WRITE_CLASS_ENCODER(TrimCounters::Request);
@@ -142,27 +148,27 @@ WRITE_CLASS_ENCODER(TrimCounters::Request);
 void TrimCounters::Response::encode(bufferlist& bl) const
 {
   ENCODE_START(1, 1, bl);
-  ::encode(bucket_counters, bl);
+  encode(bucket_counters, bl);
   ENCODE_FINISH(bl);
 }
-void TrimCounters::Response::decode(bufferlist::iterator& p)
+void TrimCounters::Response::decode(bufferlist::const_iterator& p)
 {
   DECODE_START(1, p);
-  ::decode(bucket_counters, p);
+  decode(bucket_counters, p);
   DECODE_FINISH(p);
 }
 WRITE_CLASS_ENCODER(TrimCounters::Response);
 
-void TrimCounters::Handler::handle(bufferlist::iterator& input,
+void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
                                    bufferlist& output)
 {
   Request request;
-  ::decode(request, input);
+  decode(request, input);
   auto count = std::min<uint16_t>(request.max_buckets, 128);
 
   Response response;
   server->get_bucket_counters(count, response.bucket_counters);
-  ::encode(response, output);
+  encode(response, output);
 }
 
 /// api to notify peer gateways that trim has completed and their bucket change
@@ -170,11 +176,11 @@ void TrimCounters::Handler::handle(bufferlist::iterator& input,
 struct TrimComplete {
   struct Request {
     void encode(bufferlist& bl) const;
-    void decode(bufferlist::iterator& p);
+    void decode(bufferlist::const_iterator& p);
   };
   struct Response {
     void encode(bufferlist& bl) const;
-    void decode(bufferlist::iterator& p);
+    void decode(bufferlist::const_iterator& p);
   };
 
   /// server interface to reset bucket counters
@@ -184,9 +190,9 @@ struct TrimComplete {
   class Handler : public TrimNotifyHandler {
     Server *const server;
    public:
-    Handler(Server *server) : server(server) {}
+    explicit Handler(Server *server) : server(server) {}
 
-    void handle(bufferlist::iterator& input, bufferlist& output) override;
+    void handle(bufferlist::const_iterator& input, bufferlist& output) override;
   };
 };
 
@@ -195,7 +201,7 @@ void TrimComplete::Request::encode(bufferlist& bl) const
   ENCODE_START(1, 1, bl);
   ENCODE_FINISH(bl);
 }
-void TrimComplete::Request::decode(bufferlist::iterator& p)
+void TrimComplete::Request::decode(bufferlist::const_iterator& p)
 {
   DECODE_START(1, p);
   DECODE_FINISH(p);
@@ -207,23 +213,23 @@ void TrimComplete::Response::encode(bufferlist& bl) const
   ENCODE_START(1, 1, bl);
   ENCODE_FINISH(bl);
 }
-void TrimComplete::Response::decode(bufferlist::iterator& p)
+void TrimComplete::Response::decode(bufferlist::const_iterator& p)
 {
   DECODE_START(1, p);
   DECODE_FINISH(p);
 }
 WRITE_CLASS_ENCODER(TrimComplete::Response);
 
-void TrimComplete::Handler::handle(bufferlist::iterator& input,
+void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
                                    bufferlist& output)
 {
   Request request;
-  ::decode(request, input);
+  decode(request, input);
 
   server->reset_bucket_counters();
 
   Response response;
-  ::encode(response, output);
+  encode(response, output);
 }
 
 
@@ -256,34 +262,34 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
     }
 
     // register a watch on the realm's control object
-    r = ref.ioctx.watch2(ref.oid, &handle, this);
+    r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
     if (r == -ENOENT) {
       constexpr bool exclusive = true;
-      r = ref.ioctx.create(ref.oid, exclusive);
+      r = ref.ioctx.create(ref.obj.oid, exclusive);
       if (r == -EEXIST || r == 0) {
-        r = ref.ioctx.watch2(ref.oid, &handle, this);
+        r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
       }
     }
     if (r < 0) {
-      lderr(store->ctx()) << "Failed to watch " << ref.oid
+      lderr(store->ctx()) << "Failed to watch " << ref.obj
           << " with " << cpp_strerror(-r) << dendl;
       ref.ioctx.close();
       return r;
     }
 
-    ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl;
+    ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
     return 0;
   }
 
   int restart() {
     int r = ref.ioctx.unwatch2(handle);
     if (r < 0) {
-      lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
+      lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
           << " with " << cpp_strerror(-r) << dendl;
     }
-    r = ref.ioctx.watch2(ref.oid, &handle, this);
+    r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
     if (r < 0) {
-      lderr(store->ctx()) << "Failed to restart watch on " << ref.oid
+      lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
           << " with " << cpp_strerror(-r) << dendl;
       ref.ioctx.close();
     }
@@ -305,9 +311,9 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
     }
     bufferlist reply;
     try {
-      auto p = bl.begin();
+      auto p = bl.cbegin();
       TrimNotifyType type;
-      ::decode(type, p);
+      decode(type, p);
 
       auto handler = handlers.find(type);
       if (handler != handlers.end()) {
@@ -318,7 +324,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
     } catch (const buffer::error& e) {
       lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
     }
-    ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply);
+    ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply);
   }
 
   /// reestablish the watch if it gets disconnected
@@ -327,7 +333,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
       return;
     }
     if (err == -ENOTCONN) {
-      ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl;
+      ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
       restart();
     }
   }
@@ -347,14 +353,8 @@ template <typename Iter>
 int take_min_status(CephContext *cct, Iter first, Iter last,
                     std::vector<std::string> *status)
 {
-  status->clear();
-  boost::optional<size_t> num_shards;
   for (auto peer = first; peer != last; ++peer) {
-    const size_t peer_shards = peer->size();
-    if (!num_shards) {
-      num_shards = peer_shards;
-      status->resize(*num_shards);
-    } else if (*num_shards != peer_shards) {
+    if (peer->size() != status->size()) {
       // all peers must agree on the number of shards
       return -EINVAL;
     }
@@ -418,6 +418,7 @@ class BucketTrimInstanceCR : public RGWCoroutine {
   std::string bucket_instance;
   const std::string& zone_id; //< my zone id
   RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+  int child_ret = 0;
 
   using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
   std::vector<StatusShards> peer_status; //< sync status for each peer
@@ -430,8 +431,8 @@ class BucketTrimInstanceCR : public RGWCoroutine {
     : RGWCoroutine(store->ctx()), store(store),
       http(http), observer(observer),
       bucket_instance(bucket_instance),
-      zone_id(store->get_zone().id),
-      peer_status(store->zone_conn_map.size())
+      zone_id(store->svc.zone->get_zone().id),
+      peer_status(store->svc.zone->get_zone_data_notify_to_map().size())
   {}
 
   int operate() override;
@@ -455,7 +456,7 @@ int BucketTrimInstanceCR::operate()
       };
 
       auto p = peer_status.begin();
-      for (auto& c : store->zone_conn_map) {
+      for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
         using StatusCR = RGWReadRESTResourceCR<StatusShards>;
         spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
               false);
@@ -468,7 +469,6 @@ int BucketTrimInstanceCR::operate()
     }
     // wait for a response from each peer. all must respond to attempt trim
     while (num_spawned()) {
-      int child_ret;
       yield wait_for_child();
       collect(&child_ret, nullptr);
       if (child_ret < 0) {
@@ -477,6 +477,11 @@ int BucketTrimInstanceCR::operate()
       }
     }
 
+    // initialize each shard with the maximum marker, which is only used when
+    // there are no peers syncing from us
+    min_markers.assign(std::max(1u, bucket_info.num_shards),
+                       RGWSyncLogTrimCR::max_marker);
+
     // determine the minimum marker for each shard
     retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
                               &min_markers);
@@ -542,16 +547,16 @@ int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
 
   try {
     // decode notify responses
-    auto p = bl.begin();
+    auto p = bl.cbegin();
     std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
     std::set<std::pair<uint64_t, uint64_t>> timeouts;
-    ::decode(replies, p);
-    ::decode(timeouts, p);
+    decode(replies, p);
+    decode(timeouts, p);
 
     for (auto& peer : replies) {
-      auto q = peer.second.begin();
+      auto q = peer.second.cbegin();
       TrimCounters::Response response;
-      ::decode(response, q);
+      decode(response, q);
       for (const auto& b : response.bucket_counters) {
         counter.insert(b.bucket, b.count);
       }
@@ -656,10 +661,10 @@ int AsyncMetadataList::_send_request()
     marker = mgr->get_marker(handle);
 
     if (!keys.empty()) {
-      assert(keys.size() == 1);
+      ceph_assert(keys.size() == 1);
       auto& key = keys.front();
       // stop at original marker
-      if (marker >= start_marker) {
+      if (marker > start_marker) {
         return 0;
       }
       if (!callback(std::move(key), std::move(marker))) {
@@ -731,7 +736,7 @@ class BucketTrimCR : public RGWCoroutine {
       observer(observer), obj(obj), counter(config.counter_size)
   {}
 
-  int operate();
+  int operate() override;
 };
 
 const std::string BucketTrimCR::section{"bucket.instance"};
@@ -750,8 +755,8 @@ int BucketTrimCR::operate()
         const TrimNotifyType type = NotifyTrimCounters;
         TrimCounters::Request request{32};
         bufferlist bl;
-        ::encode(type, bl);
-        ::encode(request, bl);
+        encode(type, bl);
+        encode(request, bl);
         call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
                                   &notify_replies));
       }
@@ -780,7 +785,7 @@ int BucketTrimCR::operate()
       // read BucketTrimStatus for marker position
       set_status("reading trim status");
       using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
-      yield call(new ReadStatus(store->get_async_rados(), store, obj,
+      yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj,
                                 &status, true, &objv));
       if (retcode < 0) {
         ldout(cct, 10) << "failed to read bilog trim status: "
@@ -838,7 +843,7 @@ int BucketTrimCR::operate()
       status.marker = std::move(last_cold_marker);
       ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
       using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
-      yield call(new WriteStatus(store->get_async_rados(), store, obj,
+      yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj,
                                  status, &objv));
       if (retcode < 0) {
         ldout(cct, 4) << "failed to write updated trim status: "
@@ -853,8 +858,8 @@ int BucketTrimCR::operate()
       const TrimNotifyType type = NotifyTrimComplete;
       TrimComplete::Request request;
       bufferlist bl;
-      ::encode(type, bl);
-      ::encode(request, bl);
+      encode(type, bl);
+      encode(request, bl);
       call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
                                 nullptr));
     }
@@ -888,7 +893,7 @@ class BucketTrimPollCR : public RGWCoroutine {
       cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
   {}
 
-  int operate();
+  int operate() override;
 };
 
 int BucketTrimPollCR::operate()
@@ -896,7 +901,7 @@ int BucketTrimPollCR::operate()
   reenter(this) {
     for (;;) {
       set_status("sleeping");
-      wait(utime_t{config.trim_interval_sec, 0});
+      wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
 
       // prevent others from trimming for our entire wait interval
       set_status("acquiring trim lock");
@@ -937,7 +942,7 @@ class RecentEventList {
   /// insert an event at the given point in time. this time must be at least as
   /// recent as the last inserted event
   void insert(T&& value, const time_point& now) {
-    // assert(events.empty() || now >= events.back().time)
+    // ceph_assert(events.empty() || now >= events.back().time)
     events.push_back(Event{std::move(value), now});
   }
 
@@ -975,17 +980,17 @@ namespace rgw {
 // read bucket trim configuration from ceph context
 void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
 {
-  auto conf = cct->_conf;
+  const auto& conf = cct->_conf;
 
   config.trim_interval_sec =
-      conf->get_val<int64_t>("rgw_sync_log_trim_interval");
+      conf.get_val<int64_t>("rgw_sync_log_trim_interval");
   config.counter_size = 512;
   config.buckets_per_interval =
-      conf->get_val<int64_t>("rgw_sync_log_trim_max_buckets");
+      conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
   config.min_cold_buckets_per_interval =
-      conf->get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
+      conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
   config.concurrent_buckets =
-      conf->get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
+      conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
   config.notify_timeout_ms = 10000;
   config.recent_size = 128;
   config.recent_duration = std::chrono::hours(2);
@@ -1015,7 +1020,7 @@ class BucketTrimManager::Impl : public TrimCounters::Server,
 
   Impl(RGWRados *store, const BucketTrimConfig& config)
     : store(store), config(config),
-      status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid),
+      status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid),
       counter(config.counter_size),
       trimmed(config.recent_size, config.recent_duration),
       watcher(store, status_obj, this)