delete h;
}
-void ReplicatedBackend::recover_object(
+int ReplicatedBackend::recover_object(
const hobject_t &hoid,
eversion_t v,
ObjectContextRef head,
hoid,
head,
h);
- return;
} else {
assert(obc);
int started = start_pushes(
hoid,
obc,
h);
- assert(started > 0);
+ if (started < 0) {
+ pushing[hoid].clear();
+ return started;
+ }
}
+ return 0;
}
void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
le.mark_unrollbackable();
auto oiter = pgt->op_map.find(le.soid);
if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
- vector<snapid_t> snaps(
- oiter->second.updated_snaps->second.begin(),
- oiter->second.updated_snaps->second.end());
- ::encode(snaps, le.snaps);
+ bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
+ ::encode(oiter->second.updated_snaps->second, bl);
+ le.snaps.swap(bl);
+ le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
}
}
generate_transaction(
t,
coll,
- !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN),
+ (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
log_entries,
&op_t,
&added,
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
pos,
cct->_conf->osd_deep_scrub_stride, bl,
- fadvise_flags, true);
+ fadvise_flags);
if (r <= 0)
break;
ghobject_t(
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
assert(iter);
- uint64_t keys_scanned = 0;
for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
iter->next(false)) {
- if (cct->_conf->osd_scan_list_ping_tp_interval &&
- (keys_scanned % cct->_conf->osd_scan_list_ping_tp_interval == 0)) {
- handle.reset_tp_timeout();
- }
- ++keys_scanned;
+ handle.reset_tp_timeout();
dout(25) << "CRC key " << iter->key() << " value:\n";
iter->value().hexdump(*_dout);
assert(j != bc->pulling.end());
ObjectContextRef obc = j->second.obc;
bc->clear_pull(j, false /* already did it */);
- if (!bc->start_pushes(i.hoid, obc, h)) {
+ int started = bc->start_pushes(i.hoid, obc, h);
+ if (started < 0) {
+ bc->pushing[i.hoid].clear();
+ bc->get_parent()->primary_failed(i.hoid);
+ bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
+ } else if (!started) {
bc->get_parent()->on_global_recover(
i.hoid, i.stat);
}
* intelligently push an object to a replica. make use of existing
* clones/heads and dup data ranges where possible.
*/
-void ReplicatedBackend::prep_push_to_replica(
+int ReplicatedBackend::prep_push_to_replica(
ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
PushOp *pop, bool cache_dont_need)
{
lock_manager);
}
- prep_push(
+ return prep_push(
obc,
soid,
peer,
std::move(lock_manager));
}
-void ReplicatedBackend::prep_push(ObjectContextRef obc,
+int ReplicatedBackend::prep_push(ObjectContextRef obc,
const hobject_t& soid, pg_shard_t peer,
PushOp *pop, bool cache_dont_need)
{
data_subset.insert(0, obc->obs.oi.size);
map<hobject_t, interval_set<uint64_t>> clone_subsets;
- prep_push(obc, soid, peer,
+ return prep_push(obc, soid, peer,
obc->obs.oi.version, data_subset, clone_subsets,
pop, cache_dont_need, ObcLockManager());
}
-void ReplicatedBackend::prep_push(
+int ReplicatedBackend::prep_push(
ObjectContextRef obc,
const hobject_t& soid, pg_shard_t peer,
eversion_t version,
&new_progress,
pop,
&(pi.stat), cache_dont_need);
- assert(r == 0);
+ if (r < 0)
+ return r;
pi.recovery_progress = new_progress;
+ return 0;
}
void ReplicatedBackend::submit_push_data(
<< dendl;
if (pop.version == eversion_t()) {
// replica doesn't have it!
- _failed_push(from, pop.soid);
+ _failed_pull(from, pop.soid);
return false;
}
pi.recovery_info.copy_subset.intersection_of(
pop.recovery_info.copy_subset);
}
+ // If primary doesn't have object info and didn't know version
+ if (pi.recovery_info.version == eversion_t()) {
+ pi.recovery_info.version = pop.version;
+ }
bool first = pi.recovery_progress.first;
if (first) {
ObjectRecoveryProgress &new_progress = *out_progress;
new_progress = progress;
- dout(7) << "send_push_op " << recovery_info.soid
+ dout(7) << __func__ << " " << recovery_info.soid
<< " v " << recovery_info.version
<< " size " << recovery_info.size
<< " recovery_info: " << recovery_info
<< dendl;
+ eversion_t v = recovery_info.version;
if (progress.first) {
int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
if(r < 0) {
// Debug
bufferlist bv = out_op->attrset[OI_ATTR];
- object_info_t oi(bv);
+ object_info_t oi;
+ try {
+ bufferlist::iterator bliter = bv.begin();
+ ::decode(oi, bliter);
+ } catch (...) {
+ dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
+ return -EINVAL;
+ }
- if (oi.version != recovery_info.version) {
+ // If requestor didn't know the version, use ours
+ if (v == eversion_t()) {
+ v = oi.version;
+ } else if (oi.version != v) {
get_parent()->clog_error() << get_info().pgid << " push "
<< recovery_info.soid << " v "
<< recovery_info.version
new_progress.first = false;
}
+ // Once we provide the version subsequent requests will have it, so
+ // at this point it must be known.
+ assert(v != eversion_t());
uint64_t available = cct->_conf->osd_recovery_max_chunk;
if (!progress.omap_complete) {
p != out_op->data_included.end();
++p) {
bufferlist bit;
- store->read(ch, ghobject_t(recovery_info.soid),
+ int r = store->read(ch, ghobject_t(recovery_info.soid),
p.get_start(), p.get_len(), bit,
cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
+ if (cct->_conf->osd_debug_random_push_read_error &&
+ (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
+ dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
+ r = -EIO;
+ }
+ if (r < 0) {
+ return r;
+ }
if (p.get_len() != bit.length()) {
dout(10) << " extent " << p.get_start() << "~" << p.get_len()
<< " is actually " << p.get_start() << "~" << bit.length()
get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
// send
- out_op->version = recovery_info.version;
+ out_op->version = v;
out_op->soid = recovery_info.soid;
out_op->recovery_info = recovery_info;
out_op->after_progress = new_progress;
return false;
} else {
PushInfo *pi = &pushing[soid][peer];
+ bool error = pushing[soid].begin()->second.recovery_progress.error;
- if (!pi->recovery_progress.data_complete) {
+ if (!pi->recovery_progress.data_complete && !error) {
dout(10) << " pushing more from, "
<< pi->recovery_progress.data_recovered_to
<< " of " << pi->recovery_info.copy_subset << dendl;
pi->recovery_info,
pi->recovery_progress, &new_progress, reply,
&(pi->stat));
- assert(r == 0);
+ // Handle the case of a read error right after we wrote, which is
+ // hopefuilly extremely rare.
+ if (r < 0) {
+ dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
+
+ error = true;
+ goto done;
+ }
pi->recovery_progress = new_progress;
return true;
} else {
// done!
- get_parent()->on_peer_recover(
- peer, soid, pi->recovery_info);
+done:
+ if (!error)
+ get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
get_parent()->release_locks(pi->lock_manager);
object_stat_sum_t stat = pi->stat;
+ eversion_t v = pi->recovery_info.version;
pushing[soid].erase(peer);
pi = NULL;
if (pushing[soid].empty()) {
- get_parent()->on_global_recover(soid, stat);
+ if (!error)
+ get_parent()->on_global_recover(soid, stat);
+ else
+ get_parent()->on_primary_error(soid, v);
+
pushing.erase(soid);
} else {
+ // This looks weird, but we erased the current peer and need to remember
+ // the error on any other one, while getting more acks.
+ if (error)
+ pushing[soid].begin()->second.recovery_progress.error = true;
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
}
}
}
-void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
+void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
{
+ dout(20) << __func__ << ": " << soid << " from " << from << dendl;
list<pg_shard_t> fl = { from };
get_parent()->failed_push(fl, soid);
ObjectContextRef obc,
RPGHandle *h)
{
- int pushes = 0;
+ list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
+
+ dout(20) << __func__ << " soid " << soid << dendl;
// who needs it?
assert(get_parent()->get_actingbackfill_shards().size() > 0);
for (set<pg_shard_t>::iterator i =
get_parent()->get_shard_missing().find(peer);
assert(j != get_parent()->get_shard_missing().end());
if (j->second.is_missing(soid)) {
- ++pushes;
- h->pushes[peer].push_back(PushOp());
- prep_push_to_replica(obc, soid, peer,
- &(h->pushes[peer].back()), h->cache_dont_need);
+ shards.push_back(j);
+ }
+ }
+
+ // If more than 1 read will occur ignore possible request to not cache
+ bool cache = shards.size() == 1 ? h->cache_dont_need : false;
+
+ for (auto j : shards) {
+ pg_shard_t peer = j->first;
+ h->pushes[peer].push_back(PushOp());
+ int r = prep_push_to_replica(obc, soid, peer,
+ &(h->pushes[peer].back()), cache);
+ if (r < 0) {
+ // Back out all failed reads
+ for (auto k : shards) {
+ pg_shard_t p = k->first;
+ dout(10) << __func__ << " clean up peer " << p << dendl;
+ h->pushes[p].pop_back();
+ if (p == peer) break;
+ }
+ return r;
}
}
- return pushes;
+ return shards.size();
}