]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
update sources to v12.2.5
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 7aba1df5d2d9852b45dbc6ebc807c2273dd40fd6..1358eefa69a9d3451dbc4d4c4bf3dc1f47e54e79 100644 (file)
@@ -1574,8 +1574,10 @@ void PrimaryLogPG::calc_trim_to()
   if (limit != eversion_t() &&
       limit != pg_trim_to &&
       pg_log.get_log().approx_size() > target) {
-    size_t num_to_trim = pg_log.get_log().approx_size() - target;
-    if (num_to_trim < cct->_conf->osd_pg_log_trim_min) {
+    size_t num_to_trim = MIN(pg_log.get_log().approx_size() - target,
+                            cct->_conf->osd_pg_log_trim_max);
+    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+       cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
       return;
     }
     list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
@@ -2899,14 +2901,15 @@ void PrimaryLogPG::kick_proxy_ops_blocked(hobject_t& soid)
   in_progress_proxy_ops.erase(p);
 }
 
-void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
+void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
+                                    vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << prdop->soid << dendl;
   prdop->canceled = true;
 
   // cancel objecter op, if we can
   if (prdop->objecter_tid) {
-    osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
+    tids->push_back(prdop->objecter_tid);
     for (uint32_t i = 0; i < prdop->ops.size(); i++) {
       prdop->ops[i].outdata.clear();
     }
@@ -2915,20 +2918,20 @@ void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
   }
 }
 
-void PrimaryLogPG::cancel_proxy_ops(bool requeue)
+void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
 
   // cancel proxy reads
   map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
   while (p != proxyread_ops.end()) {
-    cancel_proxy_read((p++)->second);
+    cancel_proxy_read((p++)->second, tids);
   }
 
   // cancel proxy writes
   map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
   while (q != proxywrite_ops.end()) {
-    cancel_proxy_write((q++)->second);
+    cancel_proxy_write((q++)->second, tids);
   }
 
   if (requeue) {
@@ -3082,14 +3085,15 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
   pwop->ctx = NULL;
 }
 
-void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
+                                     vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << pwop->soid << dendl;
   pwop->canceled = true;
 
   // cancel objecter op, if we can
   if (pwop->objecter_tid) {
-    osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+    tids->push_back(pwop->objecter_tid);
     delete pwop->ctx;
     pwop->ctx = NULL;
     proxywrite_ops.erase(pwop->objecter_tid);
@@ -8026,7 +8030,9 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
     // FIXME: if the src etc match, we could avoid restarting from the
     // beginning.
     CopyOpRef cop = copy_ops[dest];
-    cancel_copy(cop, false);
+    vector<ceph_tid_t> tids;
+    cancel_copy(cop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
@@ -8106,6 +8112,7 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
 
 void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
 {
+  vector<ceph_tid_t> tids;
   dout(10) << __func__ << " " << oid << " tid " << tid
           << " " << cpp_strerror(r) << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
@@ -8292,7 +8299,7 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
        it != proxyread_ops.end();) {
       if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_read((it++)->second);
+       cancel_proxy_read((it++)->second, &tids);
       } else {
        ++it;
       }
@@ -8300,17 +8307,40 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
         it != proxywrite_ops.end();) {
       if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_write((it++)->second);
+       cancel_proxy_write((it++)->second, &tids);
       } else {
        ++it;
       }
     }
+    osd->objecter->op_cancel(tids, -ECANCELED);
     kick_proxy_ops_blocked(cobc->obs.oi.soid);
   }
 
   kick_object_context_blocked(cobc);
 }
 
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+  vector<ceph_tid_t> tids;
+  for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+      it != proxyread_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_read((it++)->second, &tids);
+    } else {
+      ++it;
+    }
+  }
+  for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+       it != proxywrite_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_write((it++)->second, &tids);
+    } else {
+      ++it;
+    }
+  }
+  osd->objecter->op_cancel(tids, -ECANCELED);
+  kick_proxy_ops_blocked(oid);
+}
+
 void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
 {
   dout(20) << __func__ << " " << cop
@@ -8645,7 +8675,8 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     agent_choose_mode();
 }
 
-void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
+void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
+                              vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << cop->obc->obs.oi.soid
           << " from " << cop->src << " " << cop->oloc
@@ -8653,10 +8684,10 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
 
   // cancel objecter op, if we can
   if (cop->objecter_tid) {
-    osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
+    tids->push_back(cop->objecter_tid);
     cop->objecter_tid = 0;
     if (cop->objecter_tid2) {
-      osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
+      tids->push_back(cop->objecter_tid2);
       cop->objecter_tid2 = 0;
     }
   }
@@ -8675,13 +8706,13 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
   cop->obc = ObjectContextRef();
 }
 
-void PrimaryLogPG::cancel_copy_ops(bool requeue)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
   while (p != copy_ops.end()) {
     // requeue this op? can I queue up all of them?
-    cancel_copy((p++)->second, requeue);
+    cancel_copy((p++)->second, requeue, tids);
   }
 }
 
@@ -8817,7 +8848,9 @@ int PrimaryLogPG::start_flush(
       osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
       fop->dup_ops.pop_front();
     }
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   /**
@@ -9017,7 +9050,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
       return -EAGAIN;    // will retry
     } else {
       osd->logger->inc(l_osd_tier_try_flush_fail);
-      cancel_flush(fop, false);
+      vector<ceph_tid_t> tids;
+      cancel_flush(fop, false, &tids);
+      osd->objecter->op_cancel(tids, -ECANCELED);
       return -ECANCELED;
     }
   }
@@ -9056,7 +9091,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
     dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
     close_op_ctx(ctx.release());
     osd->logger->inc(l_osd_tier_try_flush_fail);
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
     return -ECANCELED;
   }
 
@@ -9096,15 +9133,22 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   return -EINPROGRESS;
 }
 
-void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
+void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
+                               vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
           << fop->objecter_tid << dendl;
   if (fop->objecter_tid) {
-    osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
+    tids->push_back(fop->objecter_tid);
     fop->objecter_tid = 0;
   }
-  if (fop->blocking) {
+  if (fop->io_tids.size()) {
+    for (auto &p : fop->io_tids) {
+      tids->push_back(p.second);
+      p.second = 0;
+    } 
+  }
+  if (fop->blocking && fop->obc->is_blocked()) {
     fop->obc->stop_block();
     kick_object_context_blocked(fop->obc);
   }
@@ -9120,12 +9164,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
   flush_ops.erase(fop->obc->obs.oi.soid);
 }
 
-void PrimaryLogPG::cancel_flush_ops(bool requeue)
+void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
   while (p != flush_ops.end()) {
-    cancel_flush((p++)->second, requeue);
+    cancel_flush((p++)->second, requeue, tids);
   }
 }
 
@@ -9503,7 +9547,7 @@ void PrimaryLogPG::submit_log_entries(
     [this, entries, repop, on_complete]() {
       ObjectStore::Transaction t;
       eversion_t old_last_update = info.last_update;
-      merge_new_log_entries(entries, t);
+      merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
 
 
       set<pg_shard_t> waiting_on;
@@ -9522,7 +9566,9 @@ void PrimaryLogPG::submit_log_entries(
            pg_whoami.shard,
            get_osdmap()->get_epoch(),
            last_peering_reset,
-           repop->rep_tid);
+           repop->rep_tid,
+           pg_trim_to,
+           min_last_complete_ondisk);
          osd->send_message_osd_cluster(
            peer.osd, m, get_osdmap()->get_epoch());
          waiting_on.insert(peer);
@@ -9604,6 +9650,8 @@ void PrimaryLogPG::submit_log_entries(
       int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
       assert(r == 0);
     });
+
+  calc_trim_to();
 }
 
 void PrimaryLogPG::cancel_log_updates()
@@ -10646,7 +10694,16 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
     op->get_req());
   assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
   ObjectStore::Transaction t;
-  append_log_entries_update_missing(m->entries, t);
+  boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+  if (m->pg_trim_to != eversion_t())
+    op_trim_to = m->pg_trim_to;
+  if (m->pg_roll_forward_to != eversion_t())
+    op_roll_forward_to = m->pg_roll_forward_to;
+
+  dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+
+  append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+  eversion_t new_lcod = info.last_complete;
 
   Context *complete = new FunctionContext(
     [=](int) {
@@ -10654,13 +10711,15 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
        op->get_req());
       lock();
       if (!pg_has_reset_since(msg->get_epoch())) {
+       update_last_complete_ondisk(new_lcod);
        MOSDPGUpdateLogMissingReply *reply =
          new MOSDPGUpdateLogMissingReply(
            spg_t(info.pgid.pgid, primary_shard().shard),
            pg_whoami.shard,
            msg->get_epoch(),
            msg->min_epoch,
-           msg->get_tid());
+           msg->get_tid(),
+           new_lcod);
        reply->set_priority(CEPH_MSG_PRIO_HIGH);
        msg->get_connection()->send_message(reply);
       }
@@ -10702,6 +10761,9 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op)
   if (it != log_entry_update_waiting_on.end()) {
     if (it->second.waiting_on.count(m->get_from())) {
       it->second.waiting_on.erase(m->get_from());
+      if (m->last_complete_ondisk != eversion_t()) {
+       update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk);
+      }
     } else {
       osd->clog->error()
        << info.pgid << " got reply "
@@ -11002,9 +11064,13 @@ void PrimaryLogPG::on_shutdown()
   scrub_clear_state();
 
   unreg_next_scrub();
-  cancel_copy_ops(false);
-  cancel_flush_ops(false);
-  cancel_proxy_ops(false);
+
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(false, &tids);
+  cancel_flush_ops(false, &tids);
+  cancel_proxy_ops(false, &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
+
   apply_and_flush_repops(false);
   cancel_log_updates();
   // we must remove PGRefs, so do this this prior to release_backoffs() callers
@@ -11109,9 +11175,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
 
   clear_scrub_reserved();
 
-  cancel_copy_ops(is_primary());
-  cancel_flush_ops(is_primary());
-  cancel_proxy_ops(is_primary());
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(is_primary(), &tids);
+  cancel_flush_ops(is_primary(), &tids);
+  cancel_proxy_ops(is_primary(), &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
 
   // requeue object waiters
   for (auto& p : waiting_for_unreadable_object) {
@@ -13754,6 +13822,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
   vector<snapid_t>::reverse_iterator curclone; // Defined only if snapset initialized
   unsigned missing = 0;
   inconsistent_snapset_wrapper soid_error, head_error;
+  unsigned soid_error_count = 0;
 
   bufferlist last_data;
 
@@ -13781,7 +13850,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       osd->clog->error() << mode << " " << info.pgid << " " << soid
                        << " no '" << OI_ATTR << "' attr";
       ++scrubber.shallow_errors;
-      soid_error.set_oi_attr_missing();
+      soid_error.set_info_missing();
     } else {
       bufferlist bv;
       bv.push_back(p->second.attrs[OI_ATTR]);
@@ -13793,8 +13862,8 @@ void PrimaryLogPG::scrub_snapshot_metadata(
        osd->clog->error() << mode << " " << info.pgid << " " << soid
                << " can't decode '" << OI_ATTR << "' attr " << e.what();
        ++scrubber.shallow_errors;
-       soid_error.set_oi_attr_corrupted();
-        soid_error.set_oi_attr_missing(); // Not available too
+       soid_error.set_info_corrupted();
+        soid_error.set_info_missing(); // Not available too
       }
     }
 
@@ -13881,6 +13950,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       ++scrubber.shallow_errors;
       soid_error.set_headless();
       scrubber.store->add_snap_error(pool.id, soid_error);
+      ++soid_error_count;
       if (head && soid.get_head() == head->get_head())
        head_error.set_clone(soid.snap);
       continue;
@@ -13895,12 +13965,13 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       }
 
       // Save previous head error information
-      if (head && head_error.errors)
+      if (head && (head_error.errors || soid_error_count))
        scrubber.store->add_snap_error(pool.id, head_error);
       // Set this as a new head object
       head = soid;
       missing = 0;
       head_error = soid_error;
+      soid_error_count = 0;
 
       dout(20) << __func__ << " " << mode << " new head " << head << dendl;
 
@@ -13909,7 +13980,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
                          << " no '" << SS_ATTR << "' attr";
         ++scrubber.shallow_errors;
        snapset = boost::none;
-       head_error.set_ss_attr_missing();
+       head_error.set_snapset_missing();
       } else {
        bufferlist bl;
        bl.push_back(p->second.attrs[SS_ATTR]);
@@ -13917,12 +13988,13 @@ void PrimaryLogPG::scrub_snapshot_metadata(
         try {
          snapset = SnapSet(); // Initialize optional<> before decoding into it
          ::decode(snapset.get(), blp);
+          head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]);
         } catch (buffer::error& e) {
          snapset = boost::none;
           osd->clog->error() << mode << " " << info.pgid << " " << soid
                << " can't decode '" << SS_ATTR << "' attr " << e.what();
          ++scrubber.shallow_errors;
-         head_error.set_ss_attr_corrupted();
+         head_error.set_snapset_corrupted();
         }
       }
 
@@ -13936,7 +14008,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
            osd->clog->error() << mode << " " << info.pgid << " " << soid
                               << " snaps.seq not set";
            ++scrubber.shallow_errors;
-           head_error.set_snapset_mismatch();
+           head_error.set_snapset_error();
           }
        }
 
@@ -14047,8 +14119,10 @@ void PrimaryLogPG::scrub_snapshot_metadata(
 
       // what's next?
       ++curclone;
-      if (soid_error.errors)
+      if (soid_error.errors) {
         scrubber.store->add_snap_error(pool.id, soid_error);
+       ++soid_error_count;
+      }
     }
 
     scrub_cstat.add(stat);
@@ -14068,7 +14142,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     log_missing(missing, head, osd->clog, info.pgid, __func__,
                mode, pool.info.allow_incomplete_clones());
   }
-  if (head && head_error.errors)
+  if (head && (head_error.errors || soid_error_count))
     scrubber.store->add_snap_error(pool.id, head_error);
 
   for (map<hobject_t,pair<uint32_t,uint32_t>>::const_iterator p =