+class RGWGCIOManager {
+ const DoutPrefixProvider* dpp;
+ CephContext *cct;
+ RGWGC *gc;
+
+ struct IO {
+ enum Type {
+ UnknownIO = 0,
+ TailIO = 1,
+ IndexIO = 2,
+ } type{UnknownIO};
+ librados::AioCompletion *c{nullptr};
+ string oid;
+ int index{-1};
+ string tag;
+ };
+
+ deque<IO> ios;
+ vector<std::vector<string> > remove_tags;
+ /* tracks the number of remaining shadow objects for a given tag in order to
+ * only remove the tag once all shadow objects have themselves been removed
+ */
+ vector<map<string, size_t> > tag_io_size;
+
+#define MAX_AIO_DEFAULT 10
+ size_t max_aio{MAX_AIO_DEFAULT};
+
+public:
+ RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
+ cct(_cct),
+ gc(_gc),
+ remove_tags(cct->_conf->rgw_gc_max_objs),
+ tag_io_size(cct->_conf->rgw_gc_max_objs) {
+ max_aio = cct->_conf->rgw_gc_max_concurrent_io;
+ }
+
+ ~RGWGCIOManager() {
+ for (auto io : ios) {
+ io.c->release();
+ }
+ }
+
+ int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
+ int index, const string& tag) {
+ while (ios.size() > max_aio) {
+ if (gc->going_down()) {
+ return 0;
+ }
+ auto ret = handle_next_completion();
+ //Return error if we are using queue, else ignore it
+ if (gc->transitioned_objects_cache[index] && ret < 0) {
+ return ret;
+ }
+ }
+
+ auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
+ int ret = ioctx->aio_operate(oid, c, op);
+ if (ret < 0) {
+ return ret;
+ }
+ ios.push_back(IO{IO::TailIO, c, oid, index, tag});
+
+ return 0;
+ }
+
+ int handle_next_completion() {
+ ceph_assert(!ios.empty());
+ IO& io = ios.front();
+ io.c->wait_for_complete();
+ int ret = io.c->get_return_value();
+ io.c->release();
+
+ if (ret == -ENOENT) {
+ ret = 0;
+ }
+
+ if (io.type == IO::IndexIO && ! gc->transitioned_objects_cache[io.index]) {
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
+ io.index << " returned error, ret=" << ret << dendl;
+ }
+ goto done;
+ }
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid <<
+ ", ret=" << ret << dendl;
+ goto done;
+ }
+
+ if (! gc->transitioned_objects_cache[io.index]) {
+ schedule_tag_removal(io.index, io.tag);
+ }
+
+ done:
+ ios.pop_front();
+ return ret;
+ }
+
+ /* This is a request to schedule a tag removal. It will be called once when
+ * there are no shadow objects. But it will also be called for every shadow
+ * object when there are any. Since we do not want the tag to be removed
+ * until all shadow objects have been successfully removed, the scheduling
+ * will not happen until the shadow object count goes down to zero
+ */
+ void schedule_tag_removal(int index, string tag) {
+ auto& ts = tag_io_size[index];
+ auto ts_it = ts.find(tag);
+ if (ts_it != ts.end()) {
+ auto& size = ts_it->second;
+ --size;
+ // wait all shadow obj delete return
+ if (size != 0)
+ return;
+
+ ts.erase(ts_it);
+ }
+
+ auto& rt = remove_tags[index];
+
+ rt.push_back(tag);
+ if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
+ flush_remove_tags(index, rt);
+ }
+ }
+
+ void add_tag_io_size(int index, string tag, size_t size) {
+ auto& ts = tag_io_size[index];
+ ts.emplace(tag, size);
+ }
+
+ int drain_ios() {
+ int ret_val = 0;
+ while (!ios.empty()) {
+ if (gc->going_down()) {
+ return -EAGAIN;
+ }
+ auto ret = handle_next_completion();
+ if (ret < 0) {
+ ret_val = ret;
+ }
+ }
+ return ret_val;
+ }
+
+ void drain() {
+ drain_ios();
+ flush_remove_tags();
+ /* the tags draining might have generated more ios, drain those too */
+ drain_ios();
+ }
+
+ void flush_remove_tags(int index, vector<string>& rt) {
+ IO index_io;
+ index_io.type = IO::IndexIO;
+ index_io.index = index;
+
+ ldpp_dout(dpp, 20) << __func__ <<
+ " removing entries from gc log shard index=" << index << ", size=" <<
+ rt.size() << ", entries=" << rt << dendl;
+
+ auto rt_guard = make_scope_guard(
+ [&]
+ {
+ rt.clear();
+ }
+ );
+
+ int ret = gc->remove(index, rt, &index_io.c);
+ if (ret < 0) {
+ /* we already cleared list of tags, this prevents us from
+ * ballooning in case of a persistent problem
+ */
+ ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" <<
+ index << " ret=" << ret << dendl;
+ return;
+ }
+ if (perfcounter) {
+ /* log the count of tags retired for rate estimation */
+ perfcounter->inc(l_rgw_gc_retire, rt.size());
+ }
+ ios.push_back(index_io);
+ }
+
+ void flush_remove_tags() {
+ int index = 0;
+ for (auto& rt : remove_tags) {
+ if (! gc->transitioned_objects_cache[index]) {
+ flush_remove_tags(index, rt);
+ }
+ ++index;
+ }
+ }
+
+ int remove_queue_entries(int index, int num_entries) {
+ int ret = gc->remove(index, num_entries);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
+ index << " ret=" << ret << dendl;
+ return ret;
+ }
+ if (perfcounter) {
+ /* log the count of tags retired for rate estimation */
+ perfcounter->inc(l_rgw_gc_retire, num_entries);
+ }
+ return 0;
+ }
+}; // class RGWGCIOManger
+
+int RGWGC::process(int index, int max_secs, bool expired_only,
+ RGWGCIOManager& io_manager)