]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/ReplicatedBackend.cc
update sources to v12.1.1
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
index def68de42274ef364980ad3c02e58550ef7ba361..7739602aeefe74d552c0087015d00528f2f0a522 100644 (file)
@@ -127,7 +127,7 @@ void ReplicatedBackend::run_recovery_op(
   delete h;
 }
 
-void ReplicatedBackend::recover_object(
+int ReplicatedBackend::recover_object(
   const hobject_t &hoid,
   eversion_t v,
   ObjectContextRef head,
@@ -145,15 +145,18 @@ void ReplicatedBackend::recover_object(
       hoid,
       head,
       h);
-    return;
   } else {
     assert(obc);
     int started = start_pushes(
       hoid,
       obc,
       h);
-    assert(started > 0);
+    if (started < 0) {
+      pushing[hoid].clear();
+      return started;
+    }
   }
+  return 0;
 }
 
 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
@@ -358,10 +361,10 @@ void generate_transaction(
     le.mark_unrollbackable();
     auto oiter = pgt->op_map.find(le.soid);
     if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
-      vector<snapid_t> snaps(
-       oiter->second.updated_snaps->second.begin(),
-       oiter->second.updated_snaps->second.end());
-      ::encode(snaps, le.snaps);
+      bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
+      ::encode(oiter->second.updated_snaps->second, bl);
+      le.snaps.swap(bl);
+      le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
     }
   }
 
@@ -514,7 +517,7 @@ void ReplicatedBackend::submit_transaction(
   generate_transaction(
     t,
     coll,
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN),
+    (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
     log_entries,
     &op_t,
     &added,
@@ -722,7 +725,7 @@ void ReplicatedBackend::be_deep_scrub(
            poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
          pos,
          cct->_conf->osd_deep_scrub_stride, bl,
-         fadvise_flags, true);
+         fadvise_flags);
     if (r <= 0)
       break;
 
@@ -770,14 +773,9 @@ void ReplicatedBackend::be_deep_scrub(
     ghobject_t(
       poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
   assert(iter);
-  uint64_t keys_scanned = 0;
   for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
     iter->next(false)) {
-    if (cct->_conf->osd_scan_list_ping_tp_interval &&
-       (keys_scanned % cct->_conf->osd_scan_list_ping_tp_interval == 0)) {
-      handle.reset_tp_timeout();
-    }
-    ++keys_scanned;
+    handle.reset_tp_timeout();
 
     dout(25) << "CRC key " << iter->key() << " value:\n";
     iter->value().hexdump(*_dout);
@@ -855,7 +853,12 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
       assert(j != bc->pulling.end());
       ObjectContextRef obc = j->second.obc;
       bc->clear_pull(j, false /* already did it */);
-      if (!bc->start_pushes(i.hoid, obc, h)) {
+      int started = bc->start_pushes(i.hoid, obc, h);
+      if (started < 0) {
+       bc->pushing[i.hoid].clear();
+       bc->get_parent()->primary_failed(i.hoid);
+       bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
+      } else if (!started) {
        bc->get_parent()->on_global_recover(
          i.hoid, i.stat);
       }
@@ -1474,7 +1477,7 @@ void ReplicatedBackend::prepare_pull(
  * intelligently push an object to a replica.  make use of existing
  * clones/heads and dup data ranges where possible.
  */
-void ReplicatedBackend::prep_push_to_replica(
+int ReplicatedBackend::prep_push_to_replica(
   ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
   PushOp *pop, bool cache_dont_need)
 {
@@ -1537,7 +1540,7 @@ void ReplicatedBackend::prep_push_to_replica(
       lock_manager);
   }
 
-  prep_push(
+  return prep_push(
     obc,
     soid,
     peer,
@@ -1549,7 +1552,7 @@ void ReplicatedBackend::prep_push_to_replica(
     std::move(lock_manager));
 }
 
-void ReplicatedBackend::prep_push(ObjectContextRef obc,
+int ReplicatedBackend::prep_push(ObjectContextRef obc,
                             const hobject_t& soid, pg_shard_t peer,
                             PushOp *pop, bool cache_dont_need)
 {
@@ -1558,12 +1561,12 @@ void ReplicatedBackend::prep_push(ObjectContextRef obc,
     data_subset.insert(0, obc->obs.oi.size);
   map<hobject_t, interval_set<uint64_t>> clone_subsets;
 
-  prep_push(obc, soid, peer,
+  return prep_push(obc, soid, peer,
            obc->obs.oi.version, data_subset, clone_subsets,
            pop, cache_dont_need, ObcLockManager());
 }
 
-void ReplicatedBackend::prep_push(
+int ReplicatedBackend::prep_push(
   ObjectContextRef obc,
   const hobject_t& soid, pg_shard_t peer,
   eversion_t version,
@@ -1592,8 +1595,10 @@ void ReplicatedBackend::prep_push(
                        &new_progress,
                        pop,
                        &(pi.stat), cache_dont_need);
-  assert(r == 0);
+  if (r < 0)
+    return r;
   pi.recovery_progress = new_progress;
+  return 0;
 }
 
 void ReplicatedBackend::submit_push_data(
@@ -1723,7 +1728,7 @@ bool ReplicatedBackend::handle_pull_response(
           << dendl;
   if (pop.version == eversion_t()) {
     // replica doesn't have it!
-    _failed_push(from, pop.soid);
+    _failed_pull(from, pop.soid);
     return false;
   }
 
@@ -1742,6 +1747,10 @@ bool ReplicatedBackend::handle_pull_response(
     pi.recovery_info.copy_subset.intersection_of(
       pop.recovery_info.copy_subset);
   }
+  // If primary doesn't have object info and didn't know version
+  if (pi.recovery_info.version == eversion_t()) {
+    pi.recovery_info.version = pop.version;
+  }
 
   bool first = pi.recovery_progress.first;
   if (first) {
@@ -1917,12 +1926,13 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
   ObjectRecoveryProgress &new_progress = *out_progress;
   new_progress = progress;
 
-  dout(7) << "send_push_op " << recovery_info.soid
+  dout(7) << __func__ << " " << recovery_info.soid
          << " v " << recovery_info.version
          << " size " << recovery_info.size
          << " recovery_info: " << recovery_info
           << dendl;
 
+  eversion_t v  = recovery_info.version;
   if (progress.first) {
     int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
     if(r < 0) {
@@ -1937,9 +1947,19 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
 
     // Debug
     bufferlist bv = out_op->attrset[OI_ATTR];
-    object_info_t oi(bv);
+    object_info_t oi;
+    try {
+     bufferlist::iterator bliter = bv.begin();
+     ::decode(oi, bliter);
+    } catch (...) {
+      dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
+      return -EINVAL;
+    }
 
-    if (oi.version != recovery_info.version) {
+    // If requestor didn't know the version, use ours
+    if (v == eversion_t()) {
+      v = oi.version;
+    } else if (oi.version != v) {
       get_parent()->clog_error() << get_info().pgid << " push "
                                 << recovery_info.soid << " v "
                                 << recovery_info.version
@@ -1950,6 +1970,9 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
 
     new_progress.first = false;
   }
+  // Once we provide the version subsequent requests will have it, so
+  // at this point it must be known.
+  assert(v != eversion_t());
 
   uint64_t available = cct->_conf->osd_recovery_max_chunk;
   if (!progress.omap_complete) {
@@ -2007,9 +2030,17 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
        p != out_op->data_included.end();
        ++p) {
     bufferlist bit;
-    store->read(ch, ghobject_t(recovery_info.soid),
+    int r = store->read(ch, ghobject_t(recovery_info.soid),
                p.get_start(), p.get_len(), bit,
                 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
+    if (cct->_conf->osd_debug_random_push_read_error &&
+        (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
+      dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
+      r = -EIO;
+    }
+    if (r < 0) {
+      return r;
+    }
     if (p.get_len() != bit.length()) {
       dout(10) << " extent " << p.get_start() << "~" << p.get_len()
               << " is actually " << p.get_start() << "~" << bit.length()
@@ -2046,7 +2077,7 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
   get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
 
   // send
-  out_op->version = recovery_info.version;
+  out_op->version = v;
   out_op->soid = recovery_info.soid;
   out_op->recovery_info = recovery_info;
   out_op->after_progress = new_progress;
@@ -2076,8 +2107,9 @@ bool ReplicatedBackend::handle_push_reply(
     return false;
   } else {
     PushInfo *pi = &pushing[soid][peer];
+    bool error = pushing[soid].begin()->second.recovery_progress.error;
 
-    if (!pi->recovery_progress.data_complete) {
+    if (!pi->recovery_progress.data_complete && !error) {
       dout(10) << " pushing more from, "
               << pi->recovery_progress.data_recovered_to
               << " of " << pi->recovery_info.copy_subset << dendl;
@@ -2086,23 +2118,40 @@ bool ReplicatedBackend::handle_push_reply(
        pi->recovery_info,
        pi->recovery_progress, &new_progress, reply,
        &(pi->stat));
-      assert(r == 0);
+      // Handle the case of a read error right after we wrote, which is
+      // hopefuilly extremely rare.
+      if (r < 0) {
+        dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
+
+       error = true;
+       goto done;
+      }
       pi->recovery_progress = new_progress;
       return true;
     } else {
       // done!
-      get_parent()->on_peer_recover(
-       peer, soid, pi->recovery_info);
+done:
+      if (!error)
+       get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
 
       get_parent()->release_locks(pi->lock_manager);
       object_stat_sum_t stat = pi->stat;
+      eversion_t v = pi->recovery_info.version;
       pushing[soid].erase(peer);
       pi = NULL;
 
       if (pushing[soid].empty()) {
-       get_parent()->on_global_recover(soid, stat);
+       if (!error)
+         get_parent()->on_global_recover(soid, stat);
+       else
+         get_parent()->on_primary_error(soid, v);
+
        pushing.erase(soid);
       } else {
+       // This looks weird, but we erased the current peer and need to remember
+       // the error on any other one, while getting more acks.
+       if (error)
+         pushing[soid].begin()->second.recovery_progress.error = true;
        dout(10) << "pushed " << soid << ", still waiting for push ack from "
                 << pushing[soid].size() << " others" << dendl;
       }
@@ -2183,8 +2232,9 @@ void ReplicatedBackend::trim_pushed_data(
   }
 }
 
-void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
+void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
 {
+  dout(20) << __func__ << ": " << soid << " from " << from << dendl;
   list<pg_shard_t> fl = { from };
   get_parent()->failed_push(fl, soid);
 
@@ -2216,7 +2266,9 @@ int ReplicatedBackend::start_pushes(
   ObjectContextRef obc,
   RPGHandle *h)
 {
-  int pushes = 0;
+  list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
+
+  dout(20) << __func__ << " soid " << soid << dendl;
   // who needs it?
   assert(get_parent()->get_actingbackfill_shards().size() > 0);
   for (set<pg_shard_t>::iterator i =
@@ -2229,11 +2281,28 @@ int ReplicatedBackend::start_pushes(
       get_parent()->get_shard_missing().find(peer);
     assert(j != get_parent()->get_shard_missing().end());
     if (j->second.is_missing(soid)) {
-      ++pushes;
-      h->pushes[peer].push_back(PushOp());
-      prep_push_to_replica(obc, soid, peer,
-                          &(h->pushes[peer].back()), h->cache_dont_need);
+      shards.push_back(j);
+    }
+  }
+
+  // If more than 1 read will occur ignore possible request to not cache
+  bool cache = shards.size() == 1 ? h->cache_dont_need : false;
+
+  for (auto j : shards) {
+    pg_shard_t peer = j->first;
+    h->pushes[peer].push_back(PushOp());
+    int r = prep_push_to_replica(obc, soid, peer,
+           &(h->pushes[peer].back()), cache);
+    if (r < 0) {
+      // Back out all failed reads
+      for (auto k : shards) {
+       pg_shard_t p = k->first;
+       dout(10) << __func__ << " clean up peer " << p << dendl;
+       h->pushes[p].pop_back();
+       if (p == peer) break;
+      }
+      return r;
     }
   }
-  return pushes;
+  return shards.size();
 }