]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/ECBackend.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osd / ECBackend.cc
index 3e6663630d13fff2a6418c34517f64284a884c60..23e5a50f0166ceb891f11e602ea81503f0412307 100644 (file)
@@ -254,14 +254,17 @@ struct OnRecoveryReadComplete :
 struct RecoveryMessages {
   map<hobject_t,
       ECBackend::read_request_t> reads;
+  map<hobject_t, set<int>> want_to_read;
   void read(
     ECBackend *ec,
     const hobject_t &hoid, uint64_t off, uint64_t len,
+    set<int> &&_want_to_read,
     const set<pg_shard_t> &need,
     bool attrs) {
     list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
     to_read.push_back(boost::make_tuple(off, len, 0));
     assert(!reads.count(hoid));
+    want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
     reads.insert(
       make_pair(
        hoid,
@@ -526,6 +529,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
     return;
   start_read_op(
     priority,
+    m.want_to_read,
     m.reads,
     OpRequestRef(),
     false, true);
@@ -571,6 +575,7 @@ void ECBackend::continue_recovery_op(
        op.hoid,
        op.recovery_progress.data_recovered_to,
        amount,
+       std::move(want),
        to_read,
        op.recovery_progress.first && !op.obc);
       op.extent_requested = make_pair(
@@ -758,6 +763,7 @@ bool ECBackend::_handle_message(
     // not conflict with ECSubWrite's operator<<.
     MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
       _op->get_nonconst_req());
+    parent->maybe_preempt_replica_scrub(op->op.soid);
     handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
     return true;
   }
@@ -1203,10 +1209,9 @@ void ECBackend::handle_sub_read_reply(
         have.insert(j->first.shard);
         dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
       }
-      set<int> want_to_read, dummy_minimum;
-      get_want_to_read_shards(&want_to_read);
+      set<int> dummy_minimum;
       int err;
-      if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
+      if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
        dout(20) << __func__ << " minimum_to_decode failed" << dendl;
         if (rop.in_progress.empty()) {
          // If we don't have enough copies and we haven't sent reads for all shards
@@ -1489,6 +1494,7 @@ void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
 
 void ECBackend::get_all_avail_shards(
   const hobject_t &hoid,
+  const set<pg_shard_t> &error_shards,
   set<int> &have,
   map<shard_id_t, pg_shard_t> &shards,
   bool for_recovery)
@@ -1499,6 +1505,8 @@ void ECBackend::get_all_avail_shards(
        ++i) {
     dout(10) << __func__ << ": checking acting " << *i << dendl;
     const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+    if (error_shards.find(*i) != error_shards.end())
+      continue;
     if (!missing.is_missing(hoid)) {
       assert(!have.count(i->shard));
       have.insert(i->shard);
@@ -1512,6 +1520,8 @@ void ECBackend::get_all_avail_shards(
           get_parent()->get_backfill_shards().begin();
         i != get_parent()->get_backfill_shards().end();
         ++i) {
+      if (error_shards.find(*i) != error_shards.end())
+       continue;
       if (have.count(i->shard)) {
        assert(shards.count(i->shard));
        continue;
@@ -1538,6 +1548,8 @@ void ECBackend::get_all_avail_shards(
        if (m) {
          assert(!(*m).is_missing(hoid));
        }
+       if (error_shards.find(*i) != error_shards.end())
+         continue;
        have.insert(i->shard);
        shards.insert(make_pair(i->shard, *i));
       }
@@ -1557,8 +1569,9 @@ int ECBackend::get_min_avail_to_read_shards(
 
   set<int> have;
   map<shard_id_t, pg_shard_t> shards;
+  set<pg_shard_t> error_shards;
 
-  get_all_avail_shards(hoid, have, shards, for_recovery);
+  get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
 
   set<int> need;
   int r = ec_impl->minimum_to_decode(want, have, &need);
@@ -1584,6 +1597,8 @@ int ECBackend::get_min_avail_to_read_shards(
 int ECBackend::get_remaining_shards(
   const hobject_t &hoid,
   const set<int> &avail,
+  const set<int> &want,
+  const read_result_t &result,
   set<pg_shard_t> *to_read,
   bool for_recovery)
 {
@@ -1591,21 +1606,41 @@ int ECBackend::get_remaining_shards(
 
   set<int> have;
   map<shard_id_t, pg_shard_t> shards;
+  set<pg_shard_t> error_shards;
+  for (auto &p : result.errors) {
+    error_shards.insert(p.first);
+  }
+
+  get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
+
+  set<int> need;
+  int r = ec_impl->minimum_to_decode(want, have, &need);
+  if (r < 0) {
+    dout(0) << __func__ << " not enough shards left to try for " << hoid
+           << " read result was " << result << dendl;
+    return -EIO;
+  }
 
-  get_all_avail_shards(hoid, have, shards, for_recovery);
+  set<int> shards_left;
+  for (auto p : need) {
+    if (avail.find(p) == avail.end()) {
+      shards_left.insert(p);
+    }
+  }
 
-  for (set<int>::iterator i = have.begin();
-       i != have.end();
+  for (set<int>::iterator i = shards_left.begin();
+       i != shards_left.end();
        ++i) {
     assert(shards.count(shard_id_t(*i)));
-    if (avail.find(*i) == avail.end())
-      to_read->insert(shards[shard_id_t(*i)]);
+    assert(avail.find(*i) == avail.end());
+    to_read->insert(shards[shard_id_t(*i)]);
   }
   return 0;
 }
 
 void ECBackend::start_read_op(
   int priority,
+  map<hobject_t, set<int>> &want_to_read,
   map<hobject_t, read_request_t> &to_read,
   OpRequestRef _op,
   bool do_redundant_reads,
@@ -1621,6 +1656,7 @@ void ECBackend::start_read_op(
       do_redundant_reads,
       for_recovery,
       _op,
+      std::move(want_to_read),
       std::move(to_read))).first->second;
   dout(10) << __func__ << ": starting " << op << dendl;
   if (_op) {
@@ -2274,6 +2310,7 @@ void ECBackend::objects_read_and_reconstruct(
     return;
   }
 
+  map<hobject_t, set<int>> obj_want_to_read;
   set<int> want_to_read;
   get_want_to_read_shards(&want_to_read);
     
@@ -2301,10 +2338,12 @@ void ECBackend::objects_read_and_reconstruct(
          shards,
          false,
          c)));
+    obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
   }
 
   start_read_op(
     CEPH_MSG_PRIO_DEFAULT,
+    obj_want_to_read,
     for_read_op,
     OpRequestRef(),
     fast_read, false);
@@ -2322,31 +2361,24 @@ int ECBackend::send_all_remaining_reads(
     already_read.insert(i->shard);
   dout(10) << __func__ << " have/error shards=" << already_read << dendl;
   set<pg_shard_t> shards;
-  int r = get_remaining_shards(hoid, already_read, &shards, rop.for_recovery);
+  int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
+                              rop.complete[hoid], &shards, rop.for_recovery);
   if (r)
     return r;
-  if (shards.empty())
-    return -EIO;
-
-  dout(10) << __func__ << " Read remaining shards " << shards << dendl;
 
-  // TODOSAM: this doesn't seem right
   list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
     rop.to_read.find(hoid)->second.to_read;
   GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
     rop.to_read.find(hoid)->second.cb;
 
-  map<hobject_t, read_request_t> for_read_op;
-  for_read_op.insert(
-    make_pair(
+  rop.to_read.erase(hoid);
+  rop.to_read.insert(make_pair(
       hoid,
       read_request_t(
        offsets,
        shards,
        false,
        c)));
-
-  rop.to_read.swap(for_read_op);
   do_read_op(rop);
   return 0;
 }
@@ -2386,47 +2418,60 @@ void ECBackend::rollback_append(
       old_size));
 }
 
-void ECBackend::be_deep_scrub(
+int ECBackend::be_deep_scrub(
   const hobject_t &poid,
-  uint32_t seed,
-  ScrubMap::object &o,
-  ThreadPool::TPHandle &handle) {
-  bufferhash h(-1); // we always used -1
+  ScrubMap &map,
+  ScrubMapBuilder &pos,
+  ScrubMap::object &o)
+{
+  dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
   int r;
+
+  uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                           CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
+
+  utime_t sleeptime;
+  sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
+  if (sleeptime != utime_t()) {
+    lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
+    sleeptime.sleep();
+  }
+
+  if (pos.data_pos == 0) {
+    pos.data_hash = bufferhash(-1);
+  }
+
   uint64_t stride = cct->_conf->osd_deep_scrub_stride;
   if (stride % sinfo.get_chunk_size())
     stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
-  uint64_t pos = 0;
 
-  uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
-
-  while (true) {
-    bufferlist bl;
-    handle.reset_tp_timeout();
-    r = store->read(
-      ch,
-      ghobject_t(
-       poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-      pos,
-      stride, bl,
-      fadvise_flags);
-    if (r < 0)
-      break;
-    if (bl.length() % sinfo.get_chunk_size()) {
-      r = -EIO;
-      break;
-    }
-    pos += r;
-    h << bl;
-    if ((unsigned)r < stride)
-      break;
+  bufferlist bl;
+  r = store->read(
+    ch,
+    ghobject_t(
+      poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+    pos.data_pos,
+    stride, bl,
+    fadvise_flags);
+  if (r < 0) {
+    dout(20) << __func__ << "  " << poid << " got "
+            << r << " on read, read_error" << dendl;
+    o.read_error = true;
+    return 0;
   }
-
-  if (r == -EIO) {
-    dout(0) << "_scan_list  " << poid << " got "
-           << r << " on read, read_error" << dendl;
+  if (bl.length() % sinfo.get_chunk_size()) {
+    dout(20) << __func__ << "  " << poid << " got "
+            << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
+            << dendl;
     o.read_error = true;
-    return;
+    return 0;
+  }
+  if (r > 0) {
+    pos.data_hash << bl;
+  }
+  pos.data_pos += r;
+  if (r == (int)stride) {
+    return -EINPROGRESS;
   }
 
   ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
@@ -2434,20 +2479,27 @@ void ECBackend::be_deep_scrub(
     dout(0) << "_scan_list  " << poid << " could not retrieve hash info" << dendl;
     o.read_error = true;
     o.digest_present = false;
-    return;
+    return 0;
   } else {
     if (!get_parent()->get_pool().allows_ecoverwrites()) {
       assert(hinfo->has_chunk_hash());
-      if (hinfo->get_total_chunk_size() != pos) {
-       dout(0) << "_scan_list  " << poid << " got incorrect size on read" << dendl;
+      if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
+       dout(0) << "_scan_list  " << poid << " got incorrect size on read 0x"
+               << std::hex << pos
+               << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
+               << dendl;
        o.ec_size_mismatch = true;
-       return;
+       return 0;
       }
 
-      if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
-       dout(0) << "_scan_list  " << poid << " got incorrect hash on read" << dendl;
+      if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
+         pos.data_hash.digest()) {
+       dout(0) << "_scan_list  " << poid << " got incorrect hash on read 0x"
+               << std::hex << pos.data_hash.digest() << " !=  expected 0x"
+               << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
+               << std::dec << dendl;
        o.ec_hash_mismatch = true;
-       return;
+       return 0;
       }
 
       /* We checked above that we match our own stored hash.  We cannot
@@ -2467,6 +2519,7 @@ void ECBackend::be_deep_scrub(
     }
   }
 
-  o.omap_digest = seed;
+  o.omap_digest = -1;
   o.omap_digest_present = true;
+  return 0;
 }