#include "Session.h"
#include "objclass/objclass.h"
+#include "common/ceph_crypto.h"
#include "common/errno.h"
#include "common/scrub_types.h"
#include "common/perf_counters.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
#include "messages/MCommandReply.h"
#include "messages/MOSDScrubReserve.h"
-#include "mds/inode_backtrace.h" // Ugh
#include "common/EventTrace.h"
#include "common/config.h"
#define DOUT_PREFIX_ARGS this, osd->whoami, get_osdmap()
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
+using TOPNSPC::common::cmd_getval;
+
template <typename T>
static ostream& _prefix(std::ostream *_dout, T *pg) {
return pg->gen_prefix(*_dout);
MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd);
-PGLSFilter::PGLSFilter() : cct(nullptr)
-{
-}
-
-PGLSFilter::~PGLSFilter()
-{
-}
+using namespace ceph::osd::scheduler;
/**
* The CopyCallback class defines an interface for completions to the
BlessedGenContext(PrimaryLogPG *pg, GenContext<T> *c, epoch_t e)
: pg(pg), c(c), e(e) {}
void finish(T t) override {
- pg->lock();
+ std::scoped_lock locker{*pg};
if (pg->pg_has_reset_since(e))
c.reset();
else
c.release()->complete(t);
- pg->unlock();
}
bool sync_finish(T t) {
// we assume here all blessed/wrapped Contexts can complete synchronously.
BlessedContext(PrimaryLogPG *pg, Context *c, epoch_t e)
: pg(pg), c(c), e(e) {}
void finish(int r) override {
- pg->lock();
+ std::scoped_lock locker{*pg};
if (pg->pg_has_reset_since(e))
c.reset();
else
c.release()->complete(r);
- pg->unlock();
}
bool sync_finish(int r) {
// we assume here all blessed/wrapped Contexts can complete synchronously.
return true;
}
void finish(int r) override {
- pg->lock();
+ std::scoped_lock locker{*pg};
pg->_applied_recovered_object(obc);
- pg->unlock();
}
};
return true;
}
void finish(int r) override {
- pg->lock();
+ std::scoped_lock locker{*pg};
pg->_applied_recovered_object_replica();
- pg->unlock();
}
};
PrimaryLogPG::CopyResults *results = nullptr;
PrimaryLogPG::OpContext *ctx;
OSDOp &osd_op;
+ uint32_t truncate_seq;
+ uint64_t truncate_size;
+ bool have_truncate = false;
CopyFromCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
: ctx(ctx), osd_op(osd_op) {
results = results_.get<1>();
int r = results_.get<0>();
+ // Only use truncate_{seq,size} from the original object if the client
+ // did not sent us these parameters
+ if (!have_truncate) {
+ truncate_seq = results->truncate_seq;
+ truncate_size = results->truncate_size;
+ }
+
// for finish_copyfrom
ctx->user_at_version = results->user_version;
uint64_t get_data_size() {
return results->object_size;
}
+ void set_truncate(uint32_t seq, uint64_t size) {
+ truncate_seq = seq;
+ truncate_size = size;
+ have_truncate = true;
+ }
};
struct CopyFromFinisher : public PrimaryLogPG::OpFinisher {
derr << __func__ << " " << hoid << " had no clone_snaps" << dendl;
}
}
- if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
- pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
+ if (!is_delete && recovery_state.get_pg_log().get_missing().is_missing(recovery_info.soid) &&
+ recovery_state.get_pg_log().get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
ceph_assert(is_primary());
- const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+ const pg_log_entry_t *latest = recovery_state.get_pg_log().get_log().objects.find(recovery_info.soid)->second;
if (latest->op == pg_log_entry_t::LOST_REVERT &&
latest->reverting_to == recovery_info.version) {
dout(10) << " got old revert version " << recovery_info.version
// keep track of active pushes for scrub
++active_pushes;
- if (recovery_info.version > pg_log.get_can_rollback_to()) {
- /* This can only happen during a repair, and even then, it would
- * be one heck of a race. If we are repairing the object, the
- * write in question must be fully committed, so it's not valid
- * to roll it back anyway (and we'll be rolled forward shortly
- * anyway) */
- PGLogEntryHandler h{this, t};
- pg_log.roll_forward_to(recovery_info.version, &h);
- }
- recover_got(recovery_info.soid, recovery_info.version);
+ recovery_state.recover_got(
+ recovery_info.soid,
+ recovery_info.version,
+ is_delete,
+ *t);
if (is_primary()) {
if (!is_delete) {
t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
publish_stats_to_osd();
- ceph_assert(missing_loc.needs_recovery(hoid));
- if (!is_delete)
- missing_loc.add_location(hoid, pg_whoami);
release_backoffs(hoid);
if (!is_unreadable_object(hoid)) {
auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
this,
get_osdmap_epoch(),
info.last_complete));
-
- // update pg
- dirty_info = true;
- write_if_dirty(*t);
}
void PrimaryLogPG::on_global_recover(
const object_stat_sum_t &stat_diff,
bool is_delete)
{
- info.stats.stats.sum.add(stat_diff);
- missing_loc.recovered(soid);
+ recovery_state.object_recovered(soid, stat_diff);
publish_stats_to_osd();
dout(10) << "pushed " << soid << " to all replicas" << dendl;
map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
finish_degraded_object(soid);
}
-void PrimaryLogPG::on_peer_recover(
- pg_shard_t peer,
- const hobject_t &soid,
- const ObjectRecoveryInfo &recovery_info)
-{
- publish_stats_to_osd();
- // done!
- peer_missing[peer].got(soid, recovery_info.version);
- missing_loc.add_location(soid, peer);
-}
-
-void PrimaryLogPG::begin_peer_recover(
- pg_shard_t peer,
- const hobject_t soid)
-{
- peer_missing[peer].revise_have(soid, eversion_t());
-}
-
void PrimaryLogPG::schedule_recovery_work(
GenContext<ThreadPool::TPHandle&> *c)
{
osd->queue_recovery_context(this, c);
}
-void PrimaryLogPG::send_message_osd_cluster(
- int peer, Message *m, epoch_t from_epoch)
+void PrimaryLogPG::replica_clear_repop_obc(
+ const vector<pg_log_entry_t> &logv,
+ ObjectStore::Transaction &t)
{
- osd->send_message_osd_cluster(peer, m, from_epoch);
-}
-
-void PrimaryLogPG::send_message_osd_cluster(
- Message *m, Connection *con)
-{
- osd->send_message_osd_cluster(m, con);
-}
-
-void PrimaryLogPG::send_message_osd_cluster(
- Message *m, const ConnectionRef& con)
-{
- osd->send_message_osd_cluster(m, con);
-}
-
-void PrimaryLogPG::on_primary_error(
- const hobject_t &oid,
- eversion_t v)
-{
- dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
- primary_failed(oid);
- primary_error(oid, v);
- backfill_add_missing(oid, v);
-}
-
-void PrimaryLogPG::backfill_add_missing(
- const hobject_t &oid,
- eversion_t v)
-{
- dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
- backfills_in_flight.erase(oid);
- missing_loc.add_missing(oid, v, eversion_t());
+ for (auto &&e: logv) {
+ /* Have to blast all clones, they share a snapset */
+ object_contexts.clear_range(
+ e.soid.get_object_boundary(), e.soid.get_head());
+ ceph_assert(
+ snapset_contexts.find(e.soid.get_head()) ==
+ snapset_contexts.end());
+ }
}
bool PrimaryLogPG::should_send_op(
const hobject_t &hoid) {
if (peer == get_primary())
return true;
- ceph_assert(peer_info.count(peer));
+ ceph_assert(recovery_state.has_peer_info(peer));
bool should_send =
hoid.pool != (int64_t)info.pgid.pool() ||
hoid <= last_backfill_started ||
- hoid <= peer_info[peer].last_backfill;
+ hoid <= recovery_state.get_peer_info(peer).last_backfill;
if (!should_send) {
- ceph_assert(is_backfill_targets(peer));
+ ceph_assert(is_backfill_target(peer));
dout(10) << __func__ << " issue_repop shipping empty opt to osd." << peer
<< ", object " << hoid
<< " beyond std::max(last_backfill_started "
<< ", peer_info[peer].last_backfill "
- << peer_info[peer].last_backfill << ")" << dendl;
+ << recovery_state.get_peer_info(peer).last_backfill
+ << ")" << dendl;
return should_send;
}
- if (async_recovery_targets.count(peer) && peer_missing[peer].is_missing(hoid)) {
+ if (is_async_recovery_target(peer) &&
+ recovery_state.get_peer_missing(peer).is_missing(hoid)) {
should_send = false;
dout(10) << __func__ << " issue_repop shipping empty opt to osd." << peer
<< ", object " << hoid
bool PrimaryLogPG::is_missing_object(const hobject_t& soid) const
{
- return pg_log.get_missing().get_items().count(soid);
+ return recovery_state.get_pg_log().get_missing().get_items().count(soid);
}
void PrimaryLogPG::maybe_kick_recovery(
{
eversion_t v;
bool work_started = false;
- if (!missing_loc.needs_recovery(soid, &v))
+ if (!recovery_state.get_missing_loc().needs_recovery(soid, &v))
return;
map<hobject_t, ObjectContextRef>::const_iterator p = recovering.find(soid);
if (p != recovering.end()) {
dout(7) << "object " << soid << " v " << v << ", already recovering." << dendl;
- } else if (missing_loc.is_unfound(soid)) {
+ } else if (recovery_state.get_missing_loc().is_unfound(soid)) {
dout(7) << "object " << soid << " v " << v << ", is unfound." << dendl;
} else {
dout(7) << "object " << soid << " v " << v << ", recovering." << dendl;
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
if (is_missing_object(soid)) {
- recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
- } else if (missing_loc.is_deleted(soid)) {
+ recover_missing(soid, v, CEPH_MSG_PRIO_HIGH, h);
+ } else if (recovery_state.get_missing_loc().is_deleted(soid)) {
prep_object_replica_deletes(soid, v, h, &work_started);
} else {
prep_object_replica_pushes(soid, v, h, &work_started);
}
- pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
+ pgbackend->run_recovery_op(h, CEPH_MSG_PRIO_HIGH);
}
}
*/
if (waiting_for_degraded_object.count(soid))
return true;
- if (pg_log.get_missing().get_items().count(soid))
+ if (recovery_state.get_pg_log().get_missing().get_items().count(soid))
return true;
- ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
+ ceph_assert(!get_acting_recovery_backfill().empty());
+ for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
+ i != get_acting_recovery_backfill().end();
++i) {
if (*i == get_primary()) continue;
pg_shard_t peer = *i;
- auto peer_missing_entry = peer_missing.find(peer);
+ auto peer_missing_entry = recovery_state.get_peer_missing().find(peer);
// If an object is missing on an async_recovery_target, return false.
// This will not block the op and the object is async recovered later.
- if (peer_missing_entry != peer_missing.end() &&
+ if (peer_missing_entry != recovery_state.get_peer_missing().end() &&
peer_missing_entry->second.get_items().count(soid)) {
- if (async_recovery_targets.count(peer))
+ if (is_async_recovery_target(peer))
continue;
else
return true;
}
// Object is degraded if after last_backfill AND
// we are backfilling it
- if (is_backfill_targets(peer) &&
- peer_info[peer].last_backfill <= soid &&
+ if (is_backfill_target(peer) &&
+ recovery_state.get_peer_info(peer).last_backfill <= soid &&
last_backfill_started >= soid &&
backfills_in_flight.count(soid))
return true;
bool PrimaryLogPG::is_degraded_on_async_recovery_target(const hobject_t& soid)
{
- for (auto &i: async_recovery_targets) {
- auto peer_missing_entry = peer_missing.find(i);
- if (peer_missing_entry != peer_missing.end() &&
+ for (auto &i: get_async_recovery_targets()) {
+ auto peer_missing_entry = recovery_state.get_peer_missing().find(i);
+ if (peer_missing_entry != recovery_state.get_peer_missing().end() &&
peer_missing_entry->second.get_items().count(soid)) {
dout(30) << __func__ << " " << soid << dendl;
return true;
PG_STATE_BACKFILL_TOOFULL))
return;
- if (pg_log.get_log().approx_size() <
+ if (recovery_state.get_pg_log().get_log().approx_size() <
cct->_conf->osd_max_pg_log_entries *
cct->_conf->osd_force_recovery_pg_log_entries_factor)
return;
// find the oldest missing object
- version_t min_version = pg_log.get_log().head.version;
+ version_t min_version = recovery_state.get_pg_log().get_log().head.version;
hobject_t soid;
- if (!pg_log.get_missing().get_rmissing().empty()) {
- min_version = pg_log.get_missing().get_rmissing().begin()->first;
- soid = pg_log.get_missing().get_rmissing().begin()->second;
+ if (!recovery_state.get_pg_log().get_missing().get_rmissing().empty()) {
+ min_version = recovery_state.get_pg_log().get_missing().get_rmissing().begin()->first;
+ soid = recovery_state.get_pg_log().get_missing().get_rmissing().begin()->second;
}
- ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator it = acting_recovery_backfill.begin();
- it != acting_recovery_backfill.end();
+ ceph_assert(!get_acting_recovery_backfill().empty());
+ for (set<pg_shard_t>::iterator it = get_acting_recovery_backfill().begin();
+ it != get_acting_recovery_backfill().end();
++it) {
if (*it == get_primary()) continue;
pg_shard_t peer = *it;
- auto it_missing = peer_missing.find(peer);
- if (it_missing != peer_missing.end() &&
+ auto it_missing = recovery_state.get_peer_missing().find(peer);
+ if (it_missing != recovery_state.get_peer_missing().end() &&
!it_missing->second.get_rmissing().empty()) {
- const auto& min_obj = peer_missing[peer].get_rmissing().begin();
+ const auto& min_obj = recovery_state.get_peer_missing(peer).get_rmissing().begin();
dout(20) << __func__ << " peer " << peer << " min_version " << min_obj->first
<< " oid " << min_obj->second << dendl;
if (min_version > min_obj->first) {
maybe_kick_recovery(soid);
}
-class PGLSPlainFilter : public PGLSFilter {
- string val;
-public:
- int init(bufferlist::const_iterator ¶ms) override
- {
- try {
- decode(xattr, params);
- decode(val, params);
- } catch (buffer::error &e) {
- return -EINVAL;
- }
-
- return 0;
+bool PrimaryLogPG::check_laggy(OpRequestRef& op)
+{
+ if (!HAVE_FEATURE(recovery_state.get_min_upacting_features(),
+ SERVER_OCTOPUS)) {
+ dout(20) << __func__ << " not all upacting has SERVER_OCTOPUS" << dendl;
+ return true;
}
- ~PGLSPlainFilter() override {}
- bool filter(const hobject_t &obj, bufferlist& xattr_data,
- bufferlist& outdata) override;
-};
+ if (state_test(PG_STATE_WAIT)) {
+ dout(10) << __func__ << " PG is WAIT state" << dendl;
+ } else if (!state_test(PG_STATE_LAGGY)) {
+ auto mnow = osd->get_mnow();
+ auto ru = recovery_state.get_readable_until();
+ if (mnow <= ru) {
+ // not laggy
+ return true;
+ }
+ dout(10) << __func__
+ << " mnow " << mnow
+ << " > readable_until " << ru << dendl;
-class PGLSParentFilter : public PGLSFilter {
- inodeno_t parent_ino;
-public:
- CephContext* cct;
- explicit PGLSParentFilter(CephContext* cct) : cct(cct) {
- xattr = "_parent";
- }
- int init(bufferlist::const_iterator ¶ms) override
- {
- try {
- decode(parent_ino, params);
- } catch (buffer::error &e) {
- return -EINVAL;
+ if (!is_primary()) {
+ osd->reply_op_error(op, -EAGAIN);
+ return false;
}
- generic_dout(0) << "parent_ino=" << parent_ino << dendl;
- return 0;
+ // go to laggy state
+ state_set(PG_STATE_LAGGY);
+ publish_stats_to_osd();
}
- ~PGLSParentFilter() override {}
- bool filter(const hobject_t &obj, bufferlist& xattr_data,
- bufferlist& outdata) override;
-};
+ dout(10) << __func__ << " not readable" << dendl;
+ waiting_for_readable.push_back(op);
+ op->mark_delayed("waiting for readable");
+ return false;
+}
-bool PGLSParentFilter::filter(const hobject_t &obj,
- bufferlist& xattr_data, bufferlist& outdata)
+bool PrimaryLogPG::check_laggy_requeue(OpRequestRef& op)
{
- auto iter = xattr_data.cbegin();
- inode_backtrace_t bt;
-
- generic_dout(0) << "PGLSParentFilter::filter" << dendl;
-
- decode(bt, iter);
-
- vector<inode_backpointer_t>::iterator vi;
- for (vi = bt.ancestors.begin(); vi != bt.ancestors.end(); ++vi) {
- generic_dout(0) << "vi->dirino=" << vi->dirino << " parent_ino=" << parent_ino << dendl;
- if (vi->dirino == parent_ino) {
- encode(*vi, outdata);
- return true;
- }
+ if (!HAVE_FEATURE(recovery_state.get_min_upacting_features(),
+ SERVER_OCTOPUS)) {
+ return true;
}
-
+ if (!state_test(PG_STATE_WAIT) && !state_test(PG_STATE_LAGGY)) {
+ return true; // not laggy
+ }
+ dout(10) << __func__ << " not readable" << dendl;
+ waiting_for_readable.push_front(op);
+ op->mark_delayed("waiting for readable");
return false;
}
-bool PGLSPlainFilter::filter(const hobject_t &obj,
- bufferlist& xattr_data, bufferlist& outdata)
+void PrimaryLogPG::recheck_readable()
{
- if (val.size() != xattr_data.length())
- return false;
-
- if (memcmp(val.c_str(), xattr_data.c_str(), val.size()))
- return false;
-
- return true;
+ if (!is_wait() && !is_laggy()) {
+ dout(20) << __func__ << " wasn't wait or laggy" << dendl;
+ return;
+ }
+ auto mnow = osd->get_mnow();
+ bool pub = false;
+ if (is_wait()) {
+ auto prior_readable_until_ub = recovery_state.get_prior_readable_until_ub();
+ if (mnow < prior_readable_until_ub) {
+ dout(10) << __func__ << " still wait (mnow " << mnow
+ << " < prior_readable_until_ub " << prior_readable_until_ub
+ << ")" << dendl;
+ } else {
+ dout(10) << __func__ << " no longer wait (mnow " << mnow
+ << " >= prior_readable_until_ub " << prior_readable_until_ub
+ << ")" << dendl;
+ state_clear(PG_STATE_WAIT);
+ recovery_state.clear_prior_readable_until_ub();
+ pub = true;
+ }
+ }
+ if (is_laggy()) {
+ auto ru = recovery_state.get_readable_until();
+ if (ru == ceph::signedspan::zero()) {
+ dout(10) << __func__ << " still laggy (mnow " << mnow
+ << ", readable_until zero)" << dendl;
+ } else if (mnow >= ru) {
+ dout(10) << __func__ << " still laggy (mnow " << mnow
+ << " >= readable_until " << ru << ")" << dendl;
+ } else {
+ dout(10) << __func__ << " no longer laggy (mnow " << mnow
+ << " < readable_until " << ru << ")" << dendl;
+ state_clear(PG_STATE_LAGGY);
+ pub = true;
+ }
+ }
+ if (pub) {
+ publish_stats_to_osd();
+ }
+ if (!is_laggy() && !is_wait()) {
+ requeue_ops(waiting_for_readable);
+ }
}
-bool PrimaryLogPG::pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata)
+bool PrimaryLogPG::pgls_filter(const PGLSFilter& filter, const hobject_t& sobj)
{
bufferlist bl;
// If filter has expressed an interest in an xattr, load it.
- if (!filter->get_xattr().empty()) {
+ if (!filter.get_xattr().empty()) {
int ret = pgbackend->objects_get_attr(
sobj,
- filter->get_xattr(),
+ filter.get_xattr(),
&bl);
- dout(0) << "getattr (sobj=" << sobj << ", attr=" << filter->get_xattr() << ") returned " << ret << dendl;
+ dout(0) << "getattr (sobj=" << sobj << ", attr=" << filter.get_xattr() << ") returned " << ret << dendl;
if (ret < 0) {
- if (ret != -ENODATA || filter->reject_empty_xattr()) {
+ if (ret != -ENODATA || filter.reject_empty_xattr()) {
return false;
}
}
}
- return filter->filter(sobj, bl, outdata);
+ return filter.filter(sobj, bl);
}
-int PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter, PGLSFilter **pfilter)
+std::pair<int, std::unique_ptr<const PGLSFilter>>
+PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter)
{
string type;
- PGLSFilter *filter;
+ // storing non-const PGLSFilter for the sake of ::init()
+ std::unique_ptr<PGLSFilter> filter;
try {
decode(type, iter);
}
catch (buffer::error& e) {
- return -EINVAL;
+ return { -EINVAL, nullptr };
}
- if (type.compare("parent") == 0) {
- filter = new PGLSParentFilter(cct);
- } else if (type.compare("plain") == 0) {
- filter = new PGLSPlainFilter();
+ if (type.compare("plain") == 0) {
+ filter = std::make_unique<PGLSPlainFilter>();
} else {
std::size_t dot = type.find(".");
if (dot == std::string::npos || dot == 0 || dot == type.size() - 1) {
- return -EINVAL;
+ return { -EINVAL, nullptr };
}
const std::string class_name = type.substr(0, dot);
const std::string filter_name = type.substr(dot + 1);
ClassHandler::ClassData *cls = NULL;
- int r = osd->class_handler->open_class(class_name, &cls);
+ int r = ClassHandler::get_instance().open_class(class_name, &cls);
if (r != 0) {
derr << "Error opening class '" << class_name << "': "
<< cpp_strerror(r) << dendl;
if (r != -EPERM) // propogate permission error
r = -EINVAL;
- return r;
+ return { r, nullptr };
} else {
ceph_assert(cls);
}
if (class_filter == NULL) {
derr << "Error finding filter '" << filter_name << "' in class "
<< class_name << dendl;
- return -EINVAL;
+ return { -EINVAL, nullptr };
}
- filter = class_filter->fn();
+ filter.reset(class_filter->fn());
if (!filter) {
// Object classes are obliged to return us something, but let's
// give an error rather than asserting out.
derr << "Buggy class " << class_name << " failed to construct "
"filter " << filter_name << dendl;
- return -EINVAL;
+ return { -EINVAL, nullptr };
}
}
if (r < 0) {
derr << "Error initializing filter " << type << ": "
<< cpp_strerror(r) << dendl;
- delete filter;
- return -EINVAL;
+ return { -EINVAL, nullptr };
} else {
// Successfully constructed and initialized, return it.
- *pfilter = filter;
- return 0;
+ return std::make_pair(0, std::move(filter));
}
}
// ==========================================================
-int PrimaryLogPG::do_command(
- cmdmap_t cmdmap,
- ostream& ss,
- bufferlist& idata,
- bufferlist& odata,
- ConnectionRef con,
- ceph_tid_t tid)
+void PrimaryLogPG::do_command(
+ const string_view& orig_prefix,
+ const cmdmap_t& cmdmap,
+ const bufferlist& idata,
+ std::function<void(int,const std::string&,bufferlist&)> on_finish)
{
- string prefix;
string format;
-
- cmd_getval(cct, cmdmap, "format", format);
- boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json"));
-
+ cmd_getval(cmdmap, "format", format);
+ std::unique_ptr<Formatter> f(Formatter::create(
+ format, "json-pretty", "json-pretty"));
+ int ret = 0;
+ stringstream ss; // stderr error message stream
+ bufferlist outbl; // if empty at end, we'll dump formatter as output
+
+ // get final prefix:
+ // - ceph pg <pgid> foo -> prefix=pg, cmd=foo
+ // - ceph tell <pgid> foo -> prefix=foo
+ string prefix(orig_prefix);
string command;
- cmd_getval(cct, cmdmap, "cmd", command);
- if (command == "query") {
+ cmd_getval(cmdmap, "cmd", command);
+ if (command.size()) {
+ prefix = command;
+ }
+
+ if (prefix == "query") {
f->open_object_section("pg");
- f->dump_string("state", pg_state_string(get_state()));
f->dump_stream("snap_trimq") << snap_trimq;
f->dump_unsigned("snap_trimq_len", snap_trimq.size());
- f->dump_unsigned("epoch", get_osdmap_epoch());
- f->open_array_section("up");
- for (vector<int>::iterator p = up.begin(); p != up.end(); ++p)
- f->dump_unsigned("osd", *p);
- f->close_section();
- f->open_array_section("acting");
- for (vector<int>::iterator p = acting.begin(); p != acting.end(); ++p)
- f->dump_unsigned("osd", *p);
- f->close_section();
- if (!backfill_targets.empty()) {
- f->open_array_section("backfill_targets");
- for (set<pg_shard_t>::iterator p = backfill_targets.begin();
- p != backfill_targets.end();
- ++p)
- f->dump_stream("shard") << *p;
- f->close_section();
- }
- if (!async_recovery_targets.empty()) {
- f->open_array_section("async_recovery_targets");
- for (set<pg_shard_t>::iterator p = async_recovery_targets.begin();
- p != async_recovery_targets.end();
- ++p)
- f->dump_stream("shard") << *p;
- f->close_section();
- }
- if (!acting_recovery_backfill.empty()) {
- f->open_array_section("acting_recovery_backfill");
- for (set<pg_shard_t>::iterator p = acting_recovery_backfill.begin();
- p != acting_recovery_backfill.end();
- ++p)
- f->dump_stream("shard") << *p;
- f->close_section();
- }
- f->open_object_section("info");
- _update_calc_stats();
- info.dump(f.get());
- f->close_section();
-
- f->open_array_section("peer_info");
- for (map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
- p != peer_info.end();
- ++p) {
- f->open_object_section("info");
- f->dump_stream("peer") << p->first;
- p->second.dump(f.get());
- f->close_section();
- }
+ recovery_state.dump_peering_state(f.get());
f->close_section();
f->open_array_section("recovery_state");
f->close_section();
f->close_section();
- f->flush(odata);
- return 0;
}
- else if (command == "mark_unfound_lost") {
+
+ else if (prefix == "mark_unfound_lost") {
string mulcmd;
- cmd_getval(cct, cmdmap, "mulcmd", mulcmd);
+ cmd_getval(cmdmap, "mulcmd", mulcmd);
int mode = -1;
if (mulcmd == "revert") {
if (pool.info.is_erasure()) {
ss << "mode must be 'delete' for ec pool";
- return -EINVAL;
+ ret = -EINVAL;
+ goto out;
}
mode = pg_log_entry_t::LOST_REVERT;
} else if (mulcmd == "delete") {
mode = pg_log_entry_t::LOST_DELETE;
} else {
ss << "mode must be 'revert' or 'delete'; mark not yet implemented";
- return -EINVAL;
+ ret = -EINVAL;
+ goto out;
}
ceph_assert(mode == pg_log_entry_t::LOST_REVERT ||
- mode == pg_log_entry_t::LOST_DELETE);
+ mode == pg_log_entry_t::LOST_DELETE);
if (!is_primary()) {
ss << "not primary";
- return -EROFS;
+ ret = -EROFS;
+ goto out;
}
- uint64_t unfound = missing_loc.num_unfound();
+ uint64_t unfound = recovery_state.get_missing_loc().num_unfound();
if (!unfound) {
ss << "pg has no unfound objects";
- return 0; // make command idempotent
+ goto out; // make command idempotent
}
- if (!all_unfound_are_queried_or_lost(get_osdmap())) {
+ if (!recovery_state.all_unfound_are_queried_or_lost(get_osdmap())) {
ss << "pg has " << unfound
<< " unfound objects but we haven't probed all sources, not marking lost";
- return -EINVAL;
+ ret = -EINVAL;
+ goto out;
}
- mark_all_unfound_lost(mode, con, tid);
- return -EAGAIN;
+ mark_all_unfound_lost(mode, on_finish);
+ return;
}
- else if (command == "list_unfound") {
+
+ else if (prefix == "list_unfound") {
hobject_t offset;
string offset_json;
bool show_offset = false;
- if (cmd_getval(cct, cmdmap, "offset", offset_json)) {
+ if (cmd_getval(cmdmap, "offset", offset_json)) {
json_spirit::Value v;
try {
if (!json_spirit::read(offset_json, v))
offset.decode(v);
} catch (std::runtime_error& e) {
ss << "error parsing offset: " << e.what();
- return -EINVAL;
+ ret = -EINVAL;
+ goto out;
}
show_offset = true;
}
offset.dump(f.get());
f->close_section();
}
- auto &needs_recovery_map = missing_loc.get_needs_recovery();
+ auto &needs_recovery_map = recovery_state.get_missing_loc()
+ .get_needs_recovery();
f->dump_int("num_missing", needs_recovery_map.size());
f->dump_int("num_unfound", get_num_unfound());
map<hobject_t, pg_missing_item>::const_iterator p =
{
f->open_array_section("objects");
int32_t num = 0;
- for (; p != needs_recovery_map.end() && num < cct->_conf->osd_command_max_records; ++p) {
- if (missing_loc.is_unfound(p->first)) {
+ for (; p != needs_recovery_map.end() &&
+ num < cct->_conf->osd_command_max_records;
+ ++p) {
+ if (recovery_state.get_missing_loc().is_unfound(p->first)) {
f->open_object_section("object");
{
f->open_object_section("oid");
p->second.dump(f.get()); // have, need keys
{
f->open_array_section("locations");
- for (set<pg_shard_t>::iterator r =
- missing_loc.get_locations(p->first).begin();
- r != missing_loc.get_locations(p->first).end();
- ++r)
- f->dump_stream("shard") << *r;
+ for (auto &&r : recovery_state.get_missing_loc().get_locations(
+ p->first)) {
+ f->dump_stream("shard") << r;
+ }
f->close_section();
}
f->close_section();
}
f->dump_bool("more", p != needs_recovery_map.end());
f->close_section();
- f->flush(odata);
- return 0;
}
- ss << "unknown pg command " << prefix;
- return -EINVAL;
+ else if (prefix == "scrub" ||
+ prefix == "deep_scrub") {
+ bool deep = (prefix == "deep_scrub");
+ int64_t time;
+ cmd_getval(cmdmap, "time", time, (int64_t)0);
+
+ if (is_primary()) {
+ const pg_pool_t *p = &pool.info;
+ double pool_scrub_max_interval = 0;
+ double scrub_max_interval;
+ if (deep) {
+ p->opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &pool_scrub_max_interval);
+ scrub_max_interval = pool_scrub_max_interval > 0 ?
+ pool_scrub_max_interval : g_conf()->osd_deep_scrub_interval;
+ } else {
+ p->opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &pool_scrub_max_interval);
+ scrub_max_interval = pool_scrub_max_interval > 0 ?
+ pool_scrub_max_interval : g_conf()->osd_scrub_max_interval;
+ }
+ // Instead of marking must_scrub force a schedule scrub
+ utime_t stamp = ceph_clock_now();
+ if (time == 0)
+ stamp -= scrub_max_interval;
+ else
+ stamp -= (float)time;
+ stamp -= 100.0; // push back last scrub more for good measure
+ if (deep) {
+ set_last_deep_scrub_stamp(stamp);
+ } else {
+ set_last_scrub_stamp(stamp);
+ }
+ f->open_object_section("result");
+ f->dump_bool("deep", deep);
+ f->dump_stream("stamp") << stamp;
+ f->close_section();
+ } else {
+ ss << "Not primary";
+ ret = -EPERM;
+ }
+ outbl.append(ss.str());
+ }
+
+ else {
+ ret = -ENOSYS;
+ ss << "prefix '" << prefix << "' not implemented";
+ }
+
+ out:
+ if (ret >= 0 && outbl.length() == 0) {
+ f->flush(outbl);
+ }
+ on_finish(ret, ss.str(), outbl);
}
+
// ==========================================================
void PrimaryLogPG::do_pg_op(OpRequestRef op)
{
- // NOTE: this is non-const because we modify the OSDOp.outdata in
- // place
- MOSDOp *m = static_cast<MOSDOp *>(op->get_nonconst_req());
+ const MOSDOp *m = static_cast<const MOSDOp *>(op->get_req());
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
dout(10) << "do_pg_op " << *m << dendl;
int result = 0;
string cname, mname;
- PGLSFilter *filter = NULL;
- bufferlist filter_out;
snapid_t snapid = m->get_snapid();
vector<OSDOp> ops = m->ops;
for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
+ std::unique_ptr<const PGLSFilter> filter;
OSDOp& osd_op = *p;
auto bp = p->indata.cbegin();
switch (p->op.op) {
result = -EINVAL;
break;
}
- if (filter) {
- delete filter;
- filter = NULL;
- }
- result = get_pgls_filter(bp, &filter);
+ std::tie(result, filter) = get_pgls_filter(bp);
if (result < 0)
break;
}
map<hobject_t, pg_missing_item>::const_iterator missing_iter =
- pg_log.get_missing().get_items().lower_bound(current);
+ recovery_state.get_pg_log().get_missing().get_items().lower_bound(current);
vector<hobject_t>::iterator ls_iter = sentries.begin();
hobject_t _max = hobject_t::get_max();
while (1) {
const hobject_t &mcand =
- missing_iter == pg_log.get_missing().get_items().end() ?
+ missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() ?
_max :
missing_iter->first;
const hobject_t &lcand =
if (candidate.get_namespace() == cct->_conf->osd_hit_set_namespace)
continue;
- if (missing_loc.is_deleted(candidate))
+ if (recovery_state.get_missing_loc().is_deleted(candidate))
continue;
// skip wrong namespace
candidate.get_namespace() != m->get_hobj().nspace)
continue;
- if (filter && !pgls_filter(filter, candidate, filter_out))
+ if (filter && !pgls_filter(*filter, candidate))
continue;
dout(20) << "pgnls item 0x" << std::hex
}
if (next.is_max() &&
- missing_iter == pg_log.get_missing().get_items().end() &&
+ missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() &&
ls_iter == sentries.end()) {
result = 1;
}
dout(10) << "pgnls handle=" << response.handle << dendl;
encode(response, osd_op.outdata);
- if (filter)
- encode(filter_out, osd_op.outdata);
dout(10) << " pgnls result=" << result << " outdata.length()="
<< osd_op.outdata.length() << dendl;
}
result = -EINVAL;
break;
}
- if (filter) {
- delete filter;
- filter = NULL;
- }
- result = get_pgls_filter(bp, &filter);
+ std::tie(result, filter) = get_pgls_filter(bp);
if (result < 0)
break;
break;
}
- ceph_assert(snapid == CEPH_NOSNAP || pg_log.get_missing().get_items().empty());
+ ceph_assert(snapid == CEPH_NOSNAP || recovery_state.get_pg_log().get_missing().get_items().empty());
map<hobject_t, pg_missing_item>::const_iterator missing_iter =
- pg_log.get_missing().get_items().lower_bound(current);
+ recovery_state.get_pg_log().get_missing().get_items().lower_bound(current);
vector<hobject_t>::iterator ls_iter = sentries.begin();
hobject_t _max = hobject_t::get_max();
while (1) {
const hobject_t &mcand =
- missing_iter == pg_log.get_missing().get_items().end() ?
+ missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() ?
_max :
missing_iter->first;
const hobject_t &lcand =
if (candidate.get_namespace() != m->get_hobj().nspace)
continue;
- if (missing_loc.is_deleted(candidate))
+ if (recovery_state.get_missing_loc().is_deleted(candidate))
continue;
- if (filter && !pgls_filter(filter, candidate, filter_out))
+ if (filter && !pgls_filter(*filter, candidate))
continue;
response.entries.push_back(make_pair(candidate.oid,
candidate.get_key()));
}
if (next.is_max() &&
- missing_iter == pg_log.get_missing().get_items().end() &&
+ missing_iter == recovery_state.get_pg_log().get_missing().get_items().end() &&
ls_iter == sentries.end()) {
result = 1;
}
response.handle = next;
encode(response, osd_op.outdata);
- if (filter)
- encode(filter_out, osd_op.outdata);
dout(10) << " pgls result=" << result << " outdata.length()="
<< osd_op.outdata.length() << dendl;
}
}
if (is_unreadable_object(oid)) {
wait_for_unreadable_object(oid, op);
- delete filter;
return;
}
result = osd->store->read(ch, ghobject_t(oid), 0, 0, osd_op.outdata);
reply->set_result(result);
reply->set_reply_versions(info.last_update, info.last_user_version);
osd->send_message_osd_client(reply, m->get_connection());
- delete filter;
}
-int PrimaryLogPG::do_scrub_ls(MOSDOp *m, OSDOp *osd_op)
+int PrimaryLogPG::do_scrub_ls(const MOSDOp *m, OSDOp *osd_op)
{
if (m->get_pg() != info.pgid.pgid) {
dout(10) << " scrubls pg=" << m->get_pg() << " != " << info.pgid << dendl;
return r;
}
-void PrimaryLogPG::calc_trim_to()
-{
- size_t target = cct->_conf->osd_min_pg_log_entries;
- if (is_degraded() ||
- state_test(PG_STATE_RECOVERING |
- PG_STATE_RECOVERY_WAIT |
- PG_STATE_BACKFILLING |
- PG_STATE_BACKFILL_WAIT |
- PG_STATE_BACKFILL_TOOFULL)) {
- target = cct->_conf->osd_max_pg_log_entries;
- }
-
- eversion_t limit = std::min(
- min_last_complete_ondisk,
- pg_log.get_can_rollback_to());
- if (limit != eversion_t() &&
- limit != pg_trim_to &&
- pg_log.get_log().approx_size() > target) {
- size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target,
- cct->_conf->osd_pg_log_trim_max);
- if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
- cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
- return;
- }
- list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
- eversion_t new_trim_to;
- for (size_t i = 0; i < num_to_trim; ++i) {
- new_trim_to = it->version;
- ++it;
- if (new_trim_to > limit) {
- new_trim_to = limit;
- dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
- break;
- }
- }
- dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
- pg_trim_to = new_trim_to;
- assert(pg_trim_to <= pg_log.get_head());
- assert(pg_trim_to <= min_last_complete_ondisk);
- }
-}
-
-void PrimaryLogPG::calc_trim_to_aggressive()
-{
- size_t target = cct->_conf->osd_min_pg_log_entries;
- if (is_degraded() ||
- state_test(PG_STATE_RECOVERING |
- PG_STATE_RECOVERY_WAIT |
- PG_STATE_BACKFILLING |
- PG_STATE_BACKFILL_WAIT |
- PG_STATE_BACKFILL_TOOFULL)) {
- target = cct->_conf->osd_max_pg_log_entries;
- }
- // limit pg log trimming up to the can_rollback_to value
- eversion_t limit = std::min(
- pg_log.get_head(),
- pg_log.get_can_rollback_to());
- dout(10) << __func__ << " limit = " << limit << dendl;
-
- if (limit != eversion_t() &&
- limit != pg_trim_to &&
- pg_log.get_log().approx_size() > target) {
- dout(10) << __func__ << " approx pg log length = "
- << pg_log.get_log().approx_size() << dendl;
- uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target,
- cct->_conf->osd_pg_log_trim_max);
- dout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl;
- if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
- cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
- return;
- }
- auto it = pg_log.get_log().log.begin(); // oldest log entry
- auto rit = pg_log.get_log().log.rbegin();
- eversion_t by_n_to_keep; // start from tail
- eversion_t by_n_to_trim = eversion_t::max(); // start from head
- for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) {
- i++;
- if (i > target && by_n_to_keep == eversion_t()) {
- by_n_to_keep = rit->version;
- }
- if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) {
- by_n_to_trim = it->version;
- }
- if (by_n_to_keep != eversion_t() &&
- by_n_to_trim != eversion_t::max()) {
- break;
- }
- }
-
- if (by_n_to_keep == eversion_t()) {
- return;
- }
-
- pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit});
- dout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl;
- ceph_assert(pg_trim_to <= pg_log.get_head());
- }
-}
-
PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool,
const map<string,string>& ec_profile, spg_t p) :
PGBackend::build_pg_backend(
_pool.info, ec_profile, this, coll_t(p), ch, o->store, cct)),
object_contexts(o->cct, o->cct->_conf->osd_pg_object_context_cache_count),
- snapset_contexts_lock("PrimaryLogPG::snapset_contexts_lock"),
new_backfill(false),
temp_seq(0),
snap_trimmer_machine(this)
{
- missing_loc.set_backend_predicates(
+ recovery_state.set_backend_predicates(
pgbackend->get_is_readable_predicate(),
pgbackend->get_is_recoverable_predicate());
snap_trimmer_machine.initiate();
void PrimaryLogPG::handle_backoff(OpRequestRef& op)
{
- const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
- SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
+ auto m = op->get_req<MOSDBackoff>();
+ auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
if (!session)
return; // drop it.
hobject_t begin = info.pgid.pgid.get_hobj_start();
const Message *m = op->get_req();
int msg_type = m->get_type();
if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
- SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
+ auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
if (!session)
return; // drop it.
}
}
- if (flushes_in_progress > 0) {
- dout(20) << flushes_in_progress
- << " flushes_in_progress pending "
- << "waiting for flush on " << op << dendl;
+ if (recovery_state.needs_flush()) {
+ dout(20) << "waiting for flush on " << op << dendl;
waiting_for_flush.push_back(op);
op->mark_delayed("waiting for flush");
return;
}
- ceph_assert(is_peered() && flushes_in_progress == 0);
+ ceph_assert(is_peered() && !recovery_state.needs_flush());
if (pgbackend->handle_message(op))
return;
case MSG_OSD_SCRUB_RESERVE:
{
- const MOSDScrubReserve *m =
- static_cast<const MOSDScrubReserve*>(op->get_req());
+ auto m = op->get_req<MOSDScrubReserve>();
switch (m->type) {
case MOSDScrubReserve::REQUEST:
handle_scrub_reserve_request(op);
hobject_t PrimaryLogPG::earliest_backfill() const
{
hobject_t e = hobject_t::get_max();
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
- ++i) {
- pg_shard_t bt = *i;
- map<pg_shard_t, pg_info_t>::const_iterator iter = peer_info.find(bt);
- ceph_assert(iter != peer_info.end());
- if (iter->second.last_backfill < e)
- e = iter->second.last_backfill;
+ for (const pg_shard_t& bt : get_backfill_targets()) {
+ const pg_info_t &pi = recovery_state.get_peer_info(bt);
+ e = std::min(pi.last_backfill, e);
}
return e;
}
dout(20) << __func__ << ": op " << *m << dendl;
- hobject_t head = m->get_hobj();
- head.snap = CEPH_NOSNAP;
+ const hobject_t head = m->get_hobj().get_head();
if (!info.pgid.pgid.contains(
info.pgid.pgid.get_split_bits(pool.info.get_pg_num()), head)) {
bool can_backoff =
m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
- SessionRef session;
+ ceph::ref_t<Session> session;
if (can_backoff) {
session = static_cast<Session*>(m->get_connection()->get_priv().get());
if (!session.get()) {
return;
}
- if (op->rmw_flags == 0) {
- int r = osd->osd->init_op_flags(op);
+ {
+ int r = op->maybe_init_op_info(*get_osdmap());
if (r) {
osd->reply_op_error(op, r);
return;
op->may_read() &&
!(op->may_write() || op->may_cache())) {
// balanced reads; any replica will do
- if (!(is_primary() || is_replica())) {
+ if (!(is_primary() || is_nonprimary())) {
osd->handle_misdirected_op(this, op);
return;
}
}
}
+ if (!check_laggy(op)) {
+ return;
+ }
+
if (!op_has_sufficient_caps(op)) {
osd->reply_op_error(op, -EPERM);
return;
}
if (can_backoff &&
(g_conf()->osd_backoff_on_degraded ||
- (g_conf()->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
+ (g_conf()->osd_backoff_on_unfound &&
+ recovery_state.get_missing_loc().is_unfound(head)))) {
add_backoff(session, head, head);
maybe_kick_recovery(head);
} else {
op->mark_delayed("waiting for scrub");
return;
}
+ if (!check_laggy_requeue(op)) {
+ return;
+ }
// blocked on snap?
if (auto blocked_iter = objects_blocked_on_degraded_snap.find(head);
eversion_t version;
version_t user_version;
int return_code = 0;
+ vector<pg_log_op_return_item_t> op_returns;
bool got = check_in_progress_op(
- m->get_reqid(), &version, &user_version, &return_code);
+ m->get_reqid(), &version, &user_version, &return_code, &op_returns);
if (got) {
dout(3) << __func__ << " dup " << m->get_reqid()
<< " version " << version << dendl;
if (already_complete(version)) {
- osd->reply_op_error(op, return_code, version, user_version);
+ osd->reply_op_error(op, return_code, version, user_version, op_returns);
} else {
dout(10) << " waiting for " << version << " to commit" << dendl;
// always queue ondisk waiters, so that we can requeue if needed
- waiting_for_ondisk[version].emplace_back(op, user_version, return_code);
+ waiting_for_ondisk[version].emplace_back(op, user_version, return_code,
+ op_returns);
op->mark_delayed("waiting for ondisk");
}
return;
hobject_t missing_oid;
// kludge around the fact that LIST_SNAPS sets CEPH_SNAPDIR for LIST_SNAPS
- hobject_t _oid_head;
- if (m->get_snapid() == CEPH_SNAPDIR) {
- _oid_head = m->get_hobj().get_head();
- }
const hobject_t& oid =
- m->get_snapid() == CEPH_SNAPDIR ? _oid_head : m->get_hobj();
+ m->get_snapid() == CEPH_SNAPDIR ? head : m->get_hobj();
// make sure LIST_SNAPS is on CEPH_SNAPDIR and nothing else
for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); ++p) {
return;
}
+ if (!is_primary()) {
+ if (!recovery_state.can_serve_replica_read(oid)) {
+ dout(20) << __func__ << ": oid " << oid
+ << " unstable write on replica, bouncing to primary."
+ << *m << dendl;
+ osd->reply_op_error(op, -EAGAIN);
+ return;
+ } else {
+ dout(20) << __func__ << ": serving replica read on oid" << oid
+ << dendl;
+ }
+ }
+
int r = find_object_context(
oid, &obc, can_create,
m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
}
dout(20) << __func__ << ": find_object_context got error " << r << dendl;
if (op->may_write() &&
- get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+ get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
record_write_error(op, oid, nullptr, r);
} else {
osd->reply_op_error(op, r);
if (r) {
dout(20) << __func__ << " returned an error: " << r << dendl;
- close_op_ctx(ctx);
if (op->may_write() &&
- get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
- record_write_error(op, oid, nullptr, r);
+ get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
+ record_write_error(op, oid, nullptr, r,
+ ctx->op->allows_returnvec() ? ctx : nullptr);
} else {
osd->reply_op_error(op, r);
}
+ close_op_ctx(ctx);
return;
}
ObjectContextRef obc)
{
ceph_assert(obc);
- if (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
- CEPH_OSD_FLAG_IGNORE_REDIRECT) {
+ if (op->get_req<MOSDOp>()->get_flags() & CEPH_OSD_FLAG_IGNORE_REDIRECT) {
dout(20) << __func__ << ": ignoring redirect due to flag" << dendl;
return cache_result_t::NOOP;
}
return cache_result_t::NOOP;
}
- vector<OSDOp> ops = static_cast<const MOSDOp*>(op->get_req())->ops;
+ vector<OSDOp> ops = op->get_req<MOSDOp>()->ops;
for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
OSDOp& osd_op = *p;
ceph_osd_op& op = osd_op.op;
if (op.op == CEPH_OSD_OP_SET_REDIRECT ||
op.op == CEPH_OSD_OP_SET_CHUNK ||
- op.op == CEPH_OSD_OP_TIER_PROMOTE ||
- op.op == CEPH_OSD_OP_UNSET_MANIFEST) {
+ op.op == CEPH_OSD_OP_UNSET_MANIFEST ||
+ op.op == CEPH_OSD_OP_TIER_FLUSH) {
+ return cache_result_t::NOOP;
+ } else if (op.op == CEPH_OSD_OP_TIER_PROMOTE) {
+ bool is_dirty = false;
+ for (auto& p : obc->obs.oi.manifest.chunk_map) {
+ if (p.second.is_dirty()) {
+ is_dirty = true;
+ }
+ }
+ if (is_dirty) {
+ start_flush(OpRequestRef(), obc, true, NULL, std::nullopt);
+ }
return cache_result_t::NOOP;
}
}
op->mark_delayed("waiting for scrub");
return cache_result_t::BLOCKED_RECOVERY;
}
+ if (!check_laggy_requeue(op)) {
+ return cache_result_t::BLOCKED_RECOVERY;
+ }
for (auto& p : obc->obs.oi.manifest.chunk_map) {
if (p.second.is_missing()) {
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ auto m = op->get_req<MOSDOp>();
const object_locator_t oloc = m->get_object_locator();
promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
return cache_result_t::BLOCKED_PROMOTE;
}
}
if (all_dirty) {
- start_flush(OpRequestRef(), obc, true, NULL, boost::none);
+ start_flush(OpRequestRef(), obc, true, NULL, std::nullopt);
}
return cache_result_t::NOOP;
}
void finish(int r) override {
if (r == -ECANCELED)
return;
- pg->lock();
+ std::scoped_lock locker{*pg};
pg->handle_manifest_flush(oid, tid, r, offset, last_offset, lpr);
pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start);
- pg->unlock();
}
};
}
int PrimaryLogPG::start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
- boost::optional<std::function<void()>> &&on_flush)
+ std::optional<std::function<void()>> &&on_flush)
{
auto p = obc->obs.oi.manifest.chunk_map.begin();
FlushOpRef manifest_fop(std::make_shared<FlushOp>());
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
CEPH_OSD_FLAG_RWORDERED;
tgt_length = chunk_data.length();
- pg_pool_t::fingerprint_t fp_algo_t = pool.info.get_fingerprint_type();
- if (iter->second.has_reference() &&
- fp_algo_t != pg_pool_t::TYPE_FINGERPRINT_NONE) {
- switch (fp_algo_t) {
+ if (pg_pool_t::fingerprint_t fp_algo = pool.info.get_fingerprint_type();
+ iter->second.has_reference() &&
+ fp_algo != pg_pool_t::TYPE_FINGERPRINT_NONE) {
+ object_t fp_oid = [fp_algo, &chunk_data]() -> string {
+ switch (fp_algo) {
case pg_pool_t::TYPE_FINGERPRINT_SHA1:
- {
- sha1_digest_t sha1r = chunk_data.sha1();
- object_t fp_oid = sha1r.to_str();
- bufferlist in;
- if (fp_oid != tgt_soid.oid) {
- // decrement old chunk's reference count
- ObjectOperation dec_op;
- cls_chunk_refcount_put_op put_call;
- ::encode(put_call, in);
- dec_op.call("refcount", "chunk_put", in);
- // we don't care dec_op's completion. scrub for dedup will fix this.
- tid = osd->objecter->mutate(
- tgt_soid.oid, oloc, dec_op, snapc,
- ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
- flags, NULL);
- in.clear();
- }
- tgt_soid.oid = fp_oid;
- iter->second.oid = tgt_soid;
- // add data op
- ceph_osd_op osd_op;
- osd_op.extent.offset = 0;
- osd_op.extent.length = chunk_data.length();
- encode(osd_op, in);
- encode(soid, in);
- in.append(chunk_data);
- obj_op.call("cas", "cas_write_or_get", in);
- break;
- }
+ return crypto::digest<crypto::SHA1>(chunk_data).to_str();
+ case pg_pool_t::TYPE_FINGERPRINT_SHA256:
+ return crypto::digest<crypto::SHA256>(chunk_data).to_str();
+ case pg_pool_t::TYPE_FINGERPRINT_SHA512:
+ return crypto::digest<crypto::SHA512>(chunk_data).to_str();
default:
assert(0 == "unrecognized fingerprint type");
- break;
- }
+ return {};
+ }
+ }();
+ bufferlist in;
+ if (fp_oid != tgt_soid.oid) {
+ // decrement old chunk's reference count
+ ObjectOperation dec_op;
+ cls_chunk_refcount_put_op put_call;
+ put_call.source = soid;
+ ::encode(put_call, in);
+ dec_op.call("cas", "chunk_put", in);
+ // we don't care dec_op's completion. scrub for dedup will fix this.
+ tid = osd->objecter->mutate(
+ tgt_soid.oid, oloc, dec_op, snapc,
+ ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
+ flags, NULL);
+ in.clear();
+ }
+ tgt_soid.oid = fp_oid;
+ iter->second.oid = tgt_soid;
+ // add data op
+ ceph_osd_op osd_op;
+ osd_op.extent.offset = 0;
+ osd_op.extent.length = chunk_data.length();
+ encode(osd_op, in);
+ encode(soid, in);
+ in.append(chunk_data);
+ obj_op.call("cas", "cas_write_or_get", in);
} else {
obj_op.add_data(CEPH_OSD_OP_WRITE, tgt_offset, tgt_length, chunk_data);
}
fin->last_offset = last_offset;
manifest_fop->chunks++;
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
tid = osd->objecter->mutate(
tgt_soid.oid, oloc, obj_op, snapc,
ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
- flags, new C_OnFinisher(fin, osd->objecter_finishers[n]));
+ flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())));
fin->tid = tid;
manifest_fop->io_tids[iter->first] = tid;
}
void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
- MOSDOpReply *orig_reply, int r)
+ MOSDOpReply *orig_reply, int r,
+ OpContext *ctx_for_op_returns)
{
dout(20) << __func__ << " r=" << r << dendl;
ceph_assert(op->may_write());
- const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
+ const osd_reqid_t &reqid = op->get_req<MOSDOp>()->get_reqid();
mempool::osd_pglog::list<pg_log_entry_t> entries;
entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
get_next_version(), eversion_t(), 0,
reqid, utime_t(), r));
+ if (ctx_for_op_returns) {
+ entries.back().set_op_returns(*ctx_for_op_returns->ops);
+ dout(20) << __func__ << " op_returns=" << entries.back().op_returns << dendl;
+ }
struct OnComplete {
PrimaryLogPG *pg;
{}
void operator()() {
ldpp_dout(pg, 20) << "finished " << __func__ << " r=" << r << dendl;
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ auto m = op->get_req<MOSDOp>();
MOSDOpReply *reply = orig_reply.detach();
- if (reply == nullptr) {
- reply = new MOSDOpReply(m, r, pg->get_osdmap_epoch(),
- flags, true);
- }
ldpp_dout(pg, 10) << " sending commit on " << *m << " " << reply << dendl;
pg->osd->send_message_osd_client(reply, m->get_connection());
}
submit_log_entries(
entries,
std::move(lock_manager),
- boost::optional<std::function<void(void)> >(
+ std::optional<std::function<void(void)> >(
OnComplete(this, op, orig_reply, r)),
op,
r);
if (op &&
op->get_req() &&
op->get_req()->get_type() == CEPH_MSG_OSD_OP &&
- (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
+ (op->get_req<MOSDOp>()->get_flags() &
CEPH_OSD_FLAG_IGNORE_CACHE)) {
dout(20) << __func__ << ": ignoring cache due to flag" << dendl;
return cache_result_t::NOOP;
missing_oid = obc->obs.oi.soid;
}
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ auto m = op->get_req<MOSDOp>();
const object_locator_t oloc = m->get_object_locator();
if (op->need_skip_handle_cache()) {
ceph_abort_msg("unreachable");
return cache_result_t::NOOP;
- case pg_pool_t::CACHEMODE_FORWARD:
- // FIXME: this mode allows requests to be reordered.
- do_cache_redirect(op);
- return cache_result_t::HANDLED_REDIRECT;
-
case pg_pool_t::CACHEMODE_READONLY:
// TODO: clean this case up
if (!obc.get() && r == -ENOENT) {
// crap, there was a failure of some kind
return cache_result_t::NOOP;
- case pg_pool_t::CACHEMODE_READFORWARD:
- // Do writeback to the cache tier for writes
- if (op->may_write() || write_ordered || must_promote) {
- if (agent_state &&
- agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
- dout(20) << __func__ << " cache pool full, waiting" << dendl;
- block_write_on_full_cache(missing_oid, op);
- return cache_result_t::BLOCKED_FULL;
- }
- promote_object(obc, missing_oid, oloc, op, promote_obc);
- return cache_result_t::BLOCKED_PROMOTE;
- }
-
- // If it is a read, we can read, we need to forward it
- do_cache_redirect(op);
- return cache_result_t::HANDLED_REDIRECT;
-
+ case pg_pool_t::CACHEMODE_FORWARD:
+ // this mode is deprecated; proxy instead
case pg_pool_t::CACHEMODE_PROXY:
if (!must_promote) {
if (op->may_write() || op->may_cache() || write_ordered) {
promote_object(obc, missing_oid, oloc, op, promote_obc);
return cache_result_t::BLOCKED_PROMOTE;
+ case pg_pool_t::CACHEMODE_READFORWARD:
+ // this mode is deprecated; proxy instead
case pg_pool_t::CACHEMODE_READPROXY:
// Do writeback to the cache tier for writes
if (op->may_write() || write_ordered || must_promote) {
void PrimaryLogPG::do_cache_redirect(OpRequestRef op)
{
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ auto m = op->get_req<MOSDOp>();
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap_epoch(),
flags, false);
void finish(int r) override {
if (prdop->canceled)
return;
- pg->lock();
+ std::scoped_lock locker{*pg};
if (prdop->canceled) {
- pg->unlock();
return;
}
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_proxy_read(oid, tid, r);
pg->osd->logger->tinc(l_osd_tier_r_lat, ceph_clock_now() - start);
}
- pg->unlock();
}
};
void finish(int r) override {
if (prdop->canceled)
return;
- pg->lock();
+ std::scoped_lock locker{*pg};
if (prdop->canceled) {
- pg->unlock();
return;
}
if (last_peering_reset == pg->get_last_peering_reset()) {
} else {
copy_offset = 0;
}
- prdop->ops[op_index].outdata.copy_in(copy_offset, obj_op->ops[0].outdata.length(),
- obj_op->ops[0].outdata.c_str());
+ prdop->ops[op_index].outdata.begin(copy_offset).copy_in(
+ obj_op->ops[0].outdata.length(),
+ obj_op->ops[0].outdata.c_str());
}
pg->finish_proxy_read(oid, tid, r);
delete obj_op;
}
}
- pg->unlock();
}
};
C_ProxyRead *fin = new C_ProxyRead(this, soid, get_last_peering_reset(),
prdop);
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
ceph_tid_t tid = osd->objecter->read(
soid.oid, oloc, obj_op,
m->get_snapid(), NULL,
- flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+ flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
&prdop->user_version,
&prdop->data_offset,
m->get_features());
osd->logger->inc(l_osd_tier_proxy_read);
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ auto m = op->get_req<MOSDOp>();
OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
ctx->reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0, false);
ctx->user_at_version = prdop->user_version;
void finish(int r) override {
if (pwop->canceled)
return;
- pg->lock();
+ std::scoped_lock locker{*pg};
if (pwop->canceled) {
- pg->unlock();
return;
}
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_proxy_write(oid, tid, r);
}
- pg->unlock();
}
};
if (!(op->may_write() || op->may_cache())) {
flags |= CEPH_OSD_FLAG_RWORDERED;
}
+ if (op->allows_returnvec()) {
+ flags |= CEPH_OSD_FLAG_RETURNVEC;
+ }
+
dout(10) << __func__ << " Start proxy write for " << *m << dendl;
ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
C_ProxyWrite_Commit *fin = new C_ProxyWrite_Commit(
this, soid, get_last_peering_reset(), pwop);
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
ceph_tid_t tid = osd->objecter->mutate(
soid.oid, oloc, obj_op, snapc,
ceph::real_clock::from_ceph_timespec(pwop->mtime),
- flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+ flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
&pwop->user_version, pwop->reqid);
fin->tid = tid;
pwop->objecter_tid = tid;
struct RefCountCallback : public Context {
public:
- PrimaryLogPG *pg;
PrimaryLogPG::OpContext *ctx;
OSDOp& osd_op;
- epoch_t last_peering_reset;
+ bool requeue = false;
- RefCountCallback(PrimaryLogPG *pg, PrimaryLogPG::OpContext *ctx,
- OSDOp &osd_op, epoch_t lpr)
- : pg(pg), ctx(ctx), osd_op(osd_op), last_peering_reset(lpr)
- {}
+ RefCountCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+ : ctx(ctx), osd_op(osd_op) {}
void finish(int r) override {
- pg->lock();
- if (last_peering_reset == pg->get_last_peering_reset()) {
- if (r >= 0) {
- osd_op.rval = 0;
- pg->execute_ctx(ctx);
- } else {
- if (ctx->op) {
- pg->osd->reply_op_error(ctx->op, r);
- }
- pg->close_op_ctx(ctx);
+ // NB: caller must already have pg->lock held
+ ctx->obc->stop_block();
+ ctx->pg->kick_object_context_blocked(ctx->obc);
+ if (r >= 0) {
+ osd_op.rval = 0;
+ ctx->pg->execute_ctx(ctx);
+ } else {
+ // on cancel simply toss op out,
+ // or requeue as requested
+ if (r != -ECANCELED) {
+ if (ctx->op)
+ ctx->pg->osd->reply_op_error(ctx->op, r);
+ } else if (requeue) {
+ if (ctx->op)
+ ctx->pg->requeue_op(ctx->op);
}
+ ctx->pg->close_op_ctx(ctx);
}
- pg->unlock();
+ }
+ void set_requeue(bool rq) {
+ requeue = rq;
}
};
}
};
+struct C_SetManifestRefCountDone : public Context {
+ RefCountCallback* cb;
+ hobject_t soid;
+ C_SetManifestRefCountDone(
+ RefCountCallback* cb, hobject_t soid) : cb(cb), soid(soid) {}
+ void finish(int r) override {
+ if (r == -ECANCELED)
+ return;
+ auto pg = cb->ctx->pg;
+ std::scoped_lock locker{*pg};
+ auto it = pg->manifest_ops.find(soid);
+ if (it == pg->manifest_ops.end()) {
+ // raced with cancel_manifest_ops
+ return;
+ }
+ pg->manifest_ops.erase(it);
+ cb->complete(r);
+ }
+};
+
+void PrimaryLogPG::cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids)
+{
+ dout(10) << __func__ << dendl;
+ auto p = manifest_ops.begin();
+ while (p != manifest_ops.end()) {
+ auto mop = p->second;
+ // cancel objecter op, if we can
+ if (mop->objecter_tid) {
+ tids->push_back(mop->objecter_tid);
+ mop->objecter_tid = 0;
+ }
+ mop->cb->set_requeue(requeue);
+ mop->cb->complete(-ECANCELED);
+ manifest_ops.erase(p++);
+ }
+}
+
void PrimaryLogPG::refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
- SnapContext snapc, bool get, Context *cb, uint64_t offset)
+ SnapContext snapc, bool get, RefCountCallback *cb, uint64_t offset)
{
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
CEPH_OSD_FLAG_RWORDERED;
obj_op.call("cas", "chunk_put", in);
}
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
- Context *c;
+ Context *c = nullptr;
if (cb) {
- c = new C_OnFinisher(cb, osd->objecter_finishers[n]);
- } else {
- c = NULL;
+ C_SetManifestRefCountDone *fin =
+ new C_SetManifestRefCountDone(cb, obc->obs.oi.soid);
+ c = new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard()));
}
- osd->objecter->mutate(
+ auto tid = osd->objecter->mutate(
soid.oid, oloc, obj_op, snapc,
ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
flags, c);
+ if (cb) {
+ manifest_ops[obc->obs.oi.soid] = std::make_shared<ManifestOp>(cb, tid);
+ obc->start_block();
+ }
}
void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
fin->obc = obc;
fin->req_total_len = req_total_len;
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
ceph_tid_t tid = osd->objecter->read(
soid.oid, oloc, obj_op,
m->get_snapid(), NULL,
- flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+ flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
&prdop->user_version,
&prdop->data_offset,
m->get_features());
osd->logger->inc(l_osd_tier_proxy_write);
- const MOSDOp *m = static_cast<const MOSDOp*>(pwop->op->get_req());
+ auto m = pwop->op->get_req<MOSDOp>();
ceph_assert(m != NULL);
if (!pwop->sent_reply) {
// send commit.
- MOSDOpReply *reply = pwop->ctx->reply;
- if (reply)
- pwop->ctx->reply = NULL;
- else {
- reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0, true);
- reply->set_reply_versions(eversion_t(), pwop->user_version);
- }
+ assert(pwop->ctx->reply == nullptr);
+ MOSDOpReply *reply = new MOSDOpReply(m, r, get_osdmap_epoch(), 0,
+ true /* we claim it below */);
+ reply->set_reply_versions(eversion_t(), pwop->user_version);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ reply->claim_op_out_data(pwop->ops);
dout(10) << " sending commit on " << pwop << " " << reply << dendl;
osd->send_message_osd_client(reply, m->get_connection());
pwop->sent_reply = true;
}
return;
}
+ if (op && !check_laggy_requeue(op)) {
+ return;
+ }
if (!obc) { // we need to create an ObjectContext
ceph_assert(missing_oid != hobject_t());
obc = get_object_context(missing_oid, true);
if (op)
wait_for_blocked_object(obc->obs.oi.soid, op);
- info.stats.stats.sum.num_promote++;
+
+ recovery_state.update_stats(
+ [](auto &history, auto &stats) {
+ stats.stats.sum.num_promote++;
+ return false;
+ });
}
void PrimaryLogPG::execute_ctx(OpContext *ctx)
ctx->reset_obs(ctx->obc);
ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctx
OpRequestRef op = ctx->op;
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ auto m = op->get_req<MOSDOp>();
ObjectContextRef obc = ctx->obc;
const hobject_t& soid = obc->obs.oi.soid;
return;
}
- bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
- // prepare the reply
- ctx->reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0,
- successful_write);
-
- // Write operations aren't allowed to return a data payload because
- // we can't do so reliably. If the client has to resend the request
- // and it has already been applied, we will return 0 with no
- // payload. Non-deterministic behavior is no good. However, it is
- // possible to construct an operation that does a read, does a guard
- // check (e.g., CMPXATTR), and then a write. Then we either succeed
- // with the write, or return a CMPXATTR and the read value.
- if (successful_write) {
- // write. normalize the result code.
- dout(20) << " zeroing write result code " << result << dendl;
- result = 0;
+ bool ignore_out_data = false;
+ if (!ctx->op_t->empty() &&
+ op->may_write() &&
+ result >= 0) {
+ // successful update
+ if (ctx->op->allows_returnvec()) {
+ // enforce reasonable bound on the return buffer sizes
+ for (auto& i : *ctx->ops) {
+ if (i.outdata.length() > cct->_conf->osd_max_write_op_reply_len) {
+ dout(10) << __func__ << " op " << i << " outdata overflow" << dendl;
+ result = -EOVERFLOW; // overall result is overflow
+ i.rval = -EOVERFLOW;
+ i.outdata.clear();
+ }
+ }
+ } else {
+ // legacy behavior -- zero result and return data etc.
+ ignore_out_data = true;
+ result = 0;
+ }
}
- ctx->reply->set_result(result);
+
+ // prepare the reply
+ ctx->reply = new MOSDOpReply(m, result, get_osdmap_epoch(), 0,
+ ignore_out_data);
+ dout(20) << __func__ << " alloc reply " << ctx->reply
+ << " result " << result << dendl;
// read or error?
if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
ceph_assert(op->may_write() || op->may_cache());
// trim log?
- if (hard_limit_pglog())
- calc_trim_to_aggressive();
- else
- calc_trim_to();
+ recovery_state.update_trim_to();
// verify that we are doing this in order?
if (cct->_conf->osd_debug_op_order && m->get_source().is_client() &&
// save just what we need from ctx
MOSDOpReply *reply = ctx->reply;
ctx->reply = nullptr;
- reply->claim_op_out_data(*ctx->ops);
reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
- close_op_ctx(ctx);
if (result == -ENOENT) {
reply->set_enoent_reply_versions(info.last_update,
}
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
// append to pg log for dup detection - don't save buffers for now
- record_write_error(op, soid, reply, result);
+ record_write_error(op, soid, reply, result,
+ ctx->op->allows_returnvec() ? ctx : nullptr);
+ close_op_ctx(ctx);
return;
}
if (m && !ctx->sent_reply) {
MOSDOpReply *reply = ctx->reply;
- if (reply)
- ctx->reply = nullptr;
- else {
- reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0, true);
- reply->set_reply_versions(ctx->at_version,
- ctx->user_at_version);
- }
+ ctx->reply = nullptr;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending reply on " << *m << " " << reply << dendl;
osd->send_message_osd_client(reply, m->get_connection());
close_op_ctx(ctx);
}
-void PrimaryLogPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
-{
- if (ctx->op)
- osd->reply_op_error(ctx->op, r, v, uv);
- close_op_ctx(ctx);
-}
-
void PrimaryLogPG::log_op_stats(const OpRequest& op,
const uint64_t inb,
const uint64_t outb)
{
- const MOSDOp* const m = static_cast<const MOSDOp*>(op.get_req());
+ auto m = op.get_req<MOSDOp>();
const utime_t now = ceph_clock_now();
const utime_t latency = now - m->get_recv_stamp();
OpRequestRef op,
ThreadPool::TPHandle &handle)
{
- const MOSDPGScan *m = static_cast<const MOSDPGScan*>(op->get_req());
+ auto m = op->get_req<MOSDPGScan>();
ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
dout(10) << "do_scan " << *m << dendl;
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- BackfillTooFull())));
+ PeeringState::BackfillTooFull())));
return;
}
pg_shard_t from = m->from;
// Check that from is in backfill_targets vector
- ceph_assert(is_backfill_targets(from));
+ ceph_assert(is_backfill_target(from));
BackfillInterval& bi = peer_backfill_info[from];
bi.begin = m->begin;
if (waiting_on_backfill.erase(from)) {
if (waiting_on_backfill.empty()) {
- ceph_assert(peer_backfill_info.size() == backfill_targets.size());
+ ceph_assert(
+ peer_backfill_info.size() ==
+ get_backfill_targets().size());
finish_recovery_op(hobject_t::get_max());
}
} else {
void PrimaryLogPG::do_backfill(OpRequestRef op)
{
- const MOSDPGBackfill *m = static_cast<const MOSDPGBackfill*>(op->get_req());
+ auto m = op->get_req<MOSDPGBackfill>();
ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
dout(10) << "do_backfill " << *m << dendl;
{
ceph_assert(cct->_conf->osd_kill_backfill_at != 2);
- info.set_last_backfill(m->last_backfill);
- // During backfill submit_push_data() tracks num_bytes which is needed in case
- // backfill stops and starts again. We want to know how many bytes this
- // pg is consuming on the disk in order to compute amount of new data
- // reserved to hold backfill if it won't fit.
- if (m->op == MOSDPGBackfill::OP_BACKFILL_PROGRESS) {
- dout(25) << __func__ << " primary " << m->stats.stats.sum.num_bytes << " local " << info.stats.stats.sum.num_bytes << dendl;
- int64_t bytes = info.stats.stats.sum.num_bytes;
- info.stats = m->stats;
- info.stats.stats.sum.num_bytes = bytes;
- } else {
- dout(20) << __func__ << " final " << m->stats.stats.sum.num_bytes << " replaces local " << info.stats.stats.sum.num_bytes << dendl;
- info.stats = m->stats;
- }
-
ObjectStore::Transaction t;
- dirty_info = true;
- write_if_dirty(t);
+ recovery_state.update_backfill_progress(
+ m->last_backfill,
+ m->stats,
+ m->op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
+ t);
+
int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
ceph_assert(tr == 0);
}
}
int PrimaryLogPG::trim_object(
- bool first, const hobject_t &coid, PrimaryLogPG::OpContextUPtr *ctxp)
+ bool first, const hobject_t &coid, snapid_t snap_to_trim,
+ PrimaryLogPG::OpContextUPtr *ctxp)
{
*ctxp = NULL;
}
set<snapid_t> new_snaps;
+ const OSDMapRef& osdmap = get_osdmap();
for (set<snapid_t>::iterator i = old_snaps.begin();
i != old_snaps.end();
++i) {
- if (!pool.info.is_removed_snap(*i))
+ if (!osdmap->in_removed_snaps_queue(info.pgid.pgid.pool(), *i) &&
+ *i != snap_to_trim) {
new_snaps.insert(*i);
+ }
}
vector<snapid_t>::iterator p = snapset.clones.end();
head_obc->obs.oi = object_info_t(head_oid);
t->remove(head_oid);
} else {
- dout(10) << coid << " filtering snapset on " << head_oid << dendl;
- snapset.filter(pool.info);
+ if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+ // filter SnapSet::snaps for the benefit of pre-octopus
+ // peers. This is perhaps overly conservative in that I'm not
+ // certain they need this, but let's be conservative here.
+ dout(10) << coid << " filtering snapset on " << head_oid << dendl;
+ snapset.filter(pool.info);
+ } else {
+ snapset.snaps.clear();
+ }
dout(10) << coid << " writing updated snapset on " << head_oid
<< ", snapset is " << snapset << dendl;
ctx->log.push_back(
void PrimaryLogPG::snap_trimmer(epoch_t queued)
{
- if (deleting || pg_has_reset_since(queued)) {
+ if (recovery_state.is_deleting() || pg_has_reset_since(queued)) {
return;
}
newop.op.extent.length = obl.length();
newop.indata = obl;
do_osd_ops(ctx, nops);
- osd_op.outdata.claim(newop.outdata);
return 0;
}
last_in_key = key;
dout(10) << "tmapup op " << (int)op << " key " << key << dendl;
-
+
// skip existing intervening keys
bool key_exists = false;
while (have_next && !key_exists) {
newop.op.extent.length = obl.length();
newop.indata = obl;
do_osd_ops(ctx, nops);
- osd_op.outdata.claim(newop.outdata);
}
}
return result;
ceph_le64 *r;
int32_t *rval;
bufferlist *outdatap;
- boost::optional<uint32_t> maybe_crc;
+ std::optional<uint32_t> maybe_crc;
uint64_t size;
OSDService *osd;
hobject_t soid;
- __le32 flags;
+ uint32_t flags;
FillInVerifyExtent(ceph_le64 *r, int32_t *rv, bufferlist *blp,
- boost::optional<uint32_t> mc, uint64_t size,
- OSDService *osd, hobject_t soid, __le32 flags) :
+ std::optional<uint32_t> mc, uint64_t size,
+ OSDService *osd, hobject_t soid, uint32_t flags) :
r(r), rval(rv), outdatap(blp), maybe_crc(mc),
size(size), osd(osd), soid(soid), flags(flags) {}
void finish(int len) override {
C_ChecksumRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
Checksummer::CSumType csum_type, bufferlist &&init_value_bl,
- boost::optional<uint32_t> maybe_crc, uint64_t size,
- OSDService *osd, hobject_t soid, __le32 flags)
+ std::optional<uint32_t> maybe_crc, uint64_t size,
+ OSDService *osd, hobject_t soid, uint32_t flags)
: primary_log_pg(primary_log_pg), osd_op(osd_op),
csum_type(csum_type), init_value_bl(std::move(init_value_bl)),
fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
bufferlist init_value_bl;
init_value_bl.substr_of(bl_it->get_bl(), bl_it->get_off(),
csum_init_value_size);
- bl_it->advance(csum_init_value_size);
+ *bl_it += csum_init_value_size;
if (pool.info.is_erasure() && op.checksum.length > 0) {
// If there is a data digest and it is possible we are reading
// entire object, pass the digest.
- boost::optional<uint32_t> maybe_crc;
+ std::optional<uint32_t> maybe_crc;
if (oi.is_data_digest() && op.checksum.offset == 0 &&
op.checksum.length >= oi.size) {
maybe_crc = oi.data_digest;
Context *fill_extent_ctx;
C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
- boost::optional<uint32_t> maybe_crc, uint64_t size,
- OSDService *osd, hobject_t soid, __le32 flags)
+ std::optional<uint32_t> maybe_crc, uint64_t size,
+ OSDService *osd, hobject_t soid, uint32_t flags)
: primary_log_pg(primary_log_pg), osd_op(osd_op),
fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
&read_bl, maybe_crc, size,
} else if (pool.info.is_erasure()) {
// If there is a data digest and it is possible we are reading
// entire object, pass the digest.
- boost::optional<uint32_t> maybe_crc;
+ std::optional<uint32_t> maybe_crc;
if (oi.is_data_digest() && op.checksum.offset == 0 &&
op.checksum.length >= oi.size) {
maybe_crc = oi.data_digest;
} else if (pool.info.is_erasure()) {
// The initialisation below is required to silence a false positive
// -Wmaybe-uninitialized warning
- boost::optional<uint32_t> maybe_crc = boost::make_optional(false, uint32_t());
+ std::optional<uint32_t> maybe_crc;
// If there is a data digest and it is possible we are reading
// entire object, pass the digest. FillInVerifyExtent will
// will check the oi.size again.
return r;
}
- map<uint64_t, uint64_t>::iterator miter;
bufferlist data_bl;
- uint64_t last = op.extent.offset;
- for (miter = m.begin(); miter != m.end(); ++miter) {
- // verify hole?
- if (cct->_conf->osd_verify_sparse_read_holes &&
- last < miter->first) {
- bufferlist t;
- uint64_t len = miter->first - last;
- r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
- if (r < 0) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read failed to read: "
- << r;
- } else if (!t.is_zero()) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read found data in hole "
- << last << "~" << len;
- }
- }
-
- bufferlist tmpbl;
- r = pgbackend->objects_read_sync(soid, miter->first, miter->second,
- op.flags, &tmpbl);
- if (r == -EIO) {
- r = rep_repair_primary_object(soid, ctx);
- }
- if (r < 0) {
- return r;
- }
-
- // this is usually happen when we get extent that exceeds the actual file
- // size
- if (r < (int)miter->second)
- miter->second = r;
- total_read += r;
- dout(10) << "sparse-read " << miter->first << "@" << miter->second
- << dendl;
- data_bl.claim_append(tmpbl);
- last = miter->first + r;
- }
-
- // verify trailing hole?
- if (cct->_conf->osd_verify_sparse_read_holes) {
- uint64_t end = std::min<uint64_t>(op.extent.offset + op.extent.length,
- oi.size);
- if (last < end) {
- bufferlist t;
- uint64_t len = end - last;
- r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
- if (r < 0) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read failed to read: " << r;
- } else if (!t.is_zero()) {
- osd->clog->error() << coll << " " << soid
- << " sparse-read found data in hole "
- << last << "~" << len;
- }
- }
+ r = pgbackend->objects_readv_sync(soid, std::move(m), op.flags, &data_bl);
+ if (r == -EIO) {
+ r = rep_repair_primary_object(soid, ctx);
+ }
+ if (r < 0) {
+ return r;
}
// Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
// Maybe at first, there is no much whole objects. With continued use, more
// and more whole object exist. So from this point, for spare-read add
// checksum make sense.
- if (total_read == oi.size && oi.is_data_digest()) {
+ if ((uint64_t)r == oi.size && oi.is_data_digest()) {
uint32_t crc = data_bl.crc32c(-1);
if (oi.data_digest != crc) {
osd->clog->error() << info.pgid << std::hex
encode(m, osd_op.outdata); // re-encode since it might be modified
::encode_destructively(data_bl, osd_op.outdata);
- dout(10) << " sparse_read got " << total_read << " bytes from object "
+ dout(10) << " sparse_read got " << r << " bytes from object "
<< soid << dendl;
}
}
}
- // TODO: check endianness (__le32 vs uint32_t, etc.)
+ // TODO: check endianness (ceph_le32 vs uint32_t, etc.)
// The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
// but the code in this function seems to treat them as native-endian. What should the
// tracepoints do?
case CEPH_OSD_OP_CACHE_TRY_FLUSH:
case CEPH_OSD_OP_UNDIRTY:
case CEPH_OSD_OP_COPY_FROM: // we handle user_version update explicitly
+ case CEPH_OSD_OP_COPY_FROM2:
case CEPH_OSD_OP_CACHE_PIN:
case CEPH_OSD_OP_CACHE_UNPIN:
case CEPH_OSD_OP_SET_REDIRECT:
case CEPH_OSD_OP_TIER_PROMOTE:
+ case CEPH_OSD_OP_TIER_FLUSH:
break;
default:
if (op.op & CEPH_OSD_OP_MODE_WR)
tracepoint(osd, do_osd_op_pre_call, soid.oid.name.c_str(), soid.snap.val, cname.c_str(), mname.c_str());
ClassHandler::ClassData *cls;
- result = osd->class_handler->open_class(cname, &cls);
+ result = ClassHandler::get_instance().open_class(cname, &cls);
ceph_assert(result == 0); // init_op_flags() already verified this works.
- ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
+ ClassHandler::ClassMethod *method = cls->get_method(mname);
if (!method) {
dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl;
result = -EOPNOTSUPP;
case CEPH_OSD_OP_UNDIRTY:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_undirty, soid.oid.name.c_str(), soid.snap.val);
if (oi.is_dirty()) {
ctx->modify = true;
ctx->delta_stats.num_wr++;
}
- result = 0;
}
break;
case CEPH_OSD_OP_CACHE_TRY_FLUSH:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_try_flush, soid.oid.name.c_str(), soid.snap.val);
- if (ctx->lock_type != ObjectContext::RWState::RWNONE) {
+ if (ctx->lock_type != RWState::RWNONE) {
dout(10) << "cache-try-flush without SKIPRWLOCKS flag set" << dendl;
result = -EINVAL;
break;
break;
}
if (oi.is_dirty()) {
- result = start_flush(ctx->op, ctx->obc, false, NULL, boost::none);
+ result = start_flush(ctx->op, ctx->obc, false, NULL, std::nullopt);
if (result == -EINPROGRESS)
result = -EAGAIN;
} else {
case CEPH_OSD_OP_CACHE_FLUSH:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_cache_flush, soid.oid.name.c_str(), soid.snap.val);
- if (ctx->lock_type == ObjectContext::RWState::RWNONE) {
+ if (ctx->lock_type == RWState::RWNONE) {
dout(10) << "cache-flush with SKIPRWLOCKS flag set" << dendl;
result = -EINVAL;
break;
}
hobject_t missing;
if (oi.is_dirty()) {
- result = start_flush(ctx->op, ctx->obc, true, &missing, boost::none);
+ result = start_flush(ctx->op, ctx->obc, true, &missing, std::nullopt);
if (result == -EINPROGRESS)
result = -EAGAIN;
} else {
case CEPH_OSD_OP_CACHE_EVICT:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_cache_evict, soid.oid.name.c_str(), soid.snap.val);
if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
notify_info_t n;
n.timeout = timeout;
n.notify_id = osd->get_next_id(get_osdmap_epoch());
- n.cookie = op.watch.cookie;
+ n.cookie = op.notify.cookie;
n.bl = bl;
ctx->notifies.push_back(n);
case CEPH_OSD_OP_SETALLOCHINT:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_setallochint, soid.oid.name.c_str(), soid.snap.val, op.alloc_hint.expected_object_size, op.alloc_hint.expected_write_size);
maybe_create_new_object(ctx);
t->set_alloc_hint(soid, op.alloc_hint.expected_object_size,
op.alloc_hint.expected_write_size,
op.alloc_hint.flags);
- result = 0;
}
break;
case CEPH_OSD_OP_WRITE:
++ctx->num_write;
+ result = 0;
{ // write
__u32 seq = oi.truncate_seq;
tracepoint(osd, do_osd_op_pre_write, soid.oid.name.c_str(), soid.snap.val, oi.size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
trim.insert(op.extent.truncate_size,
oi.size - op.extent.truncate_size);
ctx->modified_ranges.union_of(trim);
+ ctx->clean_regions.mark_data_region_dirty(op.extent.truncate_size, oi.size - op.extent.truncate_size);
}
if (op.extent.truncate_size != oi.size) {
truncate_update_size_and_usage(ctx->delta_stats,
}
write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
op.extent.offset, op.extent.length);
-
+ ctx->clean_regions.mark_data_region_dirty(op.extent.offset, op.extent.length);
+ dout(10) << "clean_regions modified" << ctx->clean_regions << dendl;
}
break;
case CEPH_OSD_OP_WRITEFULL:
++ctx->num_write;
+ result = 0;
{ // write full object
tracepoint(osd, do_osd_op_pre_writefull, soid.oid.name.c_str(), soid.snap.val, oi.size, 0, op.extent.length);
} else {
obs.oi.clear_data_digest();
}
-
+ ctx->clean_regions.mark_data_region_dirty(0,
+ std::max((uint64_t)op.extent.length, oi.size));
write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
0, op.extent.length, true);
}
interval_set<uint64_t> ch;
ch.insert(op.extent.offset, op.extent.length);
ctx->modified_ranges.union_of(ch);
+ ctx->clean_regions.mark_data_region_dirty(op.extent.offset, op.extent.length);
ctx->delta_stats.num_wr++;
oi.clear_data_digest();
} else {
break;
case CEPH_OSD_OP_CREATE:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_create, soid.oid.name.c_str(), soid.snap.val);
- int flags = le32_to_cpu(op.flags);
if (obs.exists && !oi.is_whiteout() &&
- (flags & CEPH_OSD_OP_FLAG_EXCL)) {
+ (op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
result = -EEXIST; /* this is an exclusive create */
} else {
if (osd_op.indata.length()) {
}
// category is no longer implemented.
}
- if (result >= 0) {
- maybe_create_new_object(ctx);
- t->nop(soid);
- }
+ maybe_create_new_object(ctx);
+ t->nop(soid);
}
}
break;
break;
}
++ctx->num_write;
+ result = 0;
{
// truncate
if (!obs.exists || oi.is_whiteout()) {
interval_set<uint64_t> trim;
trim.insert(op.extent.offset, oi.size-op.extent.offset);
ctx->modified_ranges.union_of(trim);
- }
+ ctx->clean_regions.mark_data_region_dirty(op.extent.offset, oi.size - op.extent.offset);
+ } else if (oi.size < op.extent.offset) {
+ ctx->clean_regions.mark_data_region_dirty(oi.size, op.extent.offset - oi.size);
+ }
if (op.extent.offset != oi.size) {
truncate_update_size_and_usage(ctx->delta_stats,
oi,
case CEPH_OSD_OP_DELETE:
++ctx->num_write;
+ result = 0;
tracepoint(osd, do_osd_op_pre_delete, soid.oid.name.c_str(), soid.snap.val);
{
if (oi.has_manifest()) {
case CEPH_OSD_OP_WATCH:
++ctx->num_write;
+ result = 0;
{
tracepoint(osd, do_osd_op_pre_watch, soid.oid.name.c_str(), soid.snap.val,
op.watch.cookie, op.watch.op);
result = -ENOENT;
break;
}
+ result = 0;
uint64_t cookie = op.watch.cookie;
entity_name_t entity = ctx->reqid.name;
ObjectContextRef obc = ctx->obc;
break;
}
++ctx->num_write;
+ result = 0;
{
if (!obs.exists || oi.is_whiteout()) {
result = -ENOENT;
ctx->delta_stats.num_objects_pinned++;
ctx->delta_stats.num_wr++;
}
- result = 0;
}
break;
break;
}
++ctx->num_write;
+ result = 0;
{
if (!obs.exists || oi.is_whiteout()) {
result = -ENOENT;
ctx->delta_stats.num_objects_pinned--;
ctx->delta_stats.num_wr++;
}
- result = 0;
}
break;
case CEPH_OSD_OP_SET_REDIRECT:
++ctx->num_write;
+ result = 0;
{
if (pool.info.is_tier()) {
result = -EINVAL;
result = -ENOENT;
break;
}
- if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
result = -EOPNOTSUPP;
break;
}
// start
ctx->op_finishers[ctx->current_osd_subop_num].reset(
new SetManifestFinisher(osd_op));
- RefCountCallback *fin = new RefCountCallback(
- this, ctx, osd_op, get_last_peering_reset());
+ RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
refcount_manifest(ctx->obc, target_oloc, target, SnapContext(),
true, fin, 0);
result = -EINPROGRESS;
oi.manifest.redirect_target = target;
oi.manifest.type = object_manifest_t::TYPE_REDIRECT;
t->truncate(soid, 0);
+ ctx->clean_regions.mark_data_region_dirty(0, oi.size);
if (oi.is_omap() && pool.info.supports_omap()) {
t->omap_clear(soid);
obs.oi.clear_omap_digest();
obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+ ctx->clean_regions.mark_omap_dirty();
}
+ write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
+ 0, oi.size, false);
ctx->delta_stats.num_bytes -= oi.size;
oi.size = 0;
oi.new_object();
case CEPH_OSD_OP_SET_CHUNK:
++ctx->num_write;
+ result = 0;
{
if (pool.info.is_tier()) {
result = -EINVAL;
result = -ENOENT;
break;
}
- if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
result = -EOPNOTSUPP;
break;
}
// start
ctx->op_finishers[ctx->current_osd_subop_num].reset(
new SetManifestFinisher(osd_op));
- RefCountCallback *fin = new RefCountCallback(
- this, ctx, osd_op, get_last_peering_reset());
+ RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
refcount_manifest(ctx->obc, tgt_oloc, target, SnapContext(),
true, fin, src_offset);
result = -EINPROGRESS;
case CEPH_OSD_OP_TIER_PROMOTE:
++ctx->num_write;
+ result = 0;
{
if (pool.info.is_tier()) {
result = -EINVAL;
result = -ENOENT;
break;
}
- if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
result = -EOPNOTSUPP;
break;
}
break;
+ case CEPH_OSD_OP_TIER_FLUSH:
+ ++ctx->num_write;
+ result = 0;
+ {
+ if (pool.info.is_tier()) {
+ result = -EINVAL;
+ break;
+ }
+ if (!obs.exists) {
+ result = -ENOENT;
+ break;
+ }
+ if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+ result = -EOPNOTSUPP;
+ break;
+ }
+ if (!obs.oi.has_manifest()) {
+ result = 0;
+ break;
+ }
+
+ hobject_t missing;
+ bool is_dirty = false;
+ for (auto& p : ctx->obc->obs.oi.manifest.chunk_map) {
+ if (p.second.is_dirty()) {
+ is_dirty = true;
+ break;
+ }
+ }
+
+ if (is_dirty) {
+ result = start_flush(ctx->op, ctx->obc, true, NULL, std::nullopt);
+ if (result == -EINPROGRESS)
+ result = -EAGAIN;
+ } else {
+ result = 0;
+ }
+ }
+
+ break;
+
case CEPH_OSD_OP_UNSET_MANIFEST:
++ctx->num_write;
+ result = 0;
{
if (pool.info.is_tier()) {
result = -EINVAL;
result = -EOPNOTSUPP;
break;
}
- if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ if (get_osdmap()->require_osd_release < ceph_release_t::luminous) {
result = -EOPNOTSUPP;
break;
}
case CEPH_OSD_OP_SETXATTR:
++ctx->num_write;
+ result = 0;
{
if (cct->_conf->osd_max_attr_size > 0 &&
op.xattr.value_len > cct->_conf->osd_max_attr_size) {
case CEPH_OSD_OP_RMXATTR:
++ctx->num_write;
+ result = 0;
{
string aname;
bp.copy(op.xattr.name_len, aname);
break;
case CEPH_OSD_OP_STARTSYNC:
+ result = 0;
t->nop(soid);
break;
newop.op.op = CEPH_OSD_OP_SYNC_READ;
newop.op.extent.offset = 0;
newop.op.extent.length = 0;
- do_osd_ops(ctx, nops);
+ result = do_osd_ops(ctx, nops);
osd_op.outdata.claim(newop.outdata);
}
break;
break;
}
++ctx->num_write;
+ result = 0;
{
maybe_create_new_object(ctx);
bufferlist to_set_bl;
}
}
t->omap_setkeys(soid, to_set_bl);
+ ctx->clean_regions.mark_omap_dirty();
ctx->delta_stats.num_wr++;
ctx->delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
}
break;
}
++ctx->num_write;
+ result = 0;
{
maybe_create_new_object(ctx);
t->omap_setheader(soid, osd_op.indata);
+ ctx->clean_regions.mark_omap_dirty();
ctx->delta_stats.num_wr++;
}
obs.oi.set_flag(object_info_t::FLAG_OMAP);
break;
}
++ctx->num_write;
+ result = 0;
{
if (!obs.exists || oi.is_whiteout()) {
result = -ENOENT;
}
if (oi.is_omap()) {
t->omap_clear(soid);
+ ctx->clean_regions.mark_omap_dirty();
ctx->delta_stats.num_wr++;
obs.oi.clear_omap_digest();
obs.oi.clear_flag(object_info_t::FLAG_OMAP);
break;
}
++ctx->num_write;
+ result = 0;
{
if (!obs.exists || oi.is_whiteout()) {
result = -ENOENT;
}
tracepoint(osd, do_osd_op_pre_omaprmkeys, soid.oid.name.c_str(), soid.snap.val);
t->omap_rmkeys(soid, to_rm_bl);
+ ctx->clean_regions.mark_omap_dirty();
+ ctx->delta_stats.num_wr++;
+ }
+ obs.oi.clear_omap_digest();
+ break;
+
+ case CEPH_OSD_OP_OMAPRMKEYRANGE:
+ tracepoint(osd, do_osd_op_pre_omaprmkeyrange, soid.oid.name.c_str(), soid.snap.val);
+ if (!pool.info.supports_omap()) {
+ result = -EOPNOTSUPP;
+ break;
+ }
+ ++ctx->num_write;
+ result = 0;
+ {
+ if (!obs.exists || oi.is_whiteout()) {
+ result = -ENOENT;
+ break;
+ }
+ std::string key_begin, key_end;
+ try {
+ decode(key_begin, bp);
+ decode(key_end, bp);
+ } catch (buffer::error& e) {
+ result = -EINVAL;
+ goto fail;
+ }
+ t->omap_rmkeyrange(soid, key_begin, key_end);
ctx->delta_stats.num_wr++;
}
obs.oi.clear_omap_digest();
break;
case CEPH_OSD_OP_COPY_FROM:
+ case CEPH_OSD_OP_COPY_FROM2:
++ctx->num_write;
+ result = 0;
{
object_t src_name;
object_locator_t src_oloc;
+ uint32_t truncate_seq = 0;
+ uint64_t truncate_size = 0;
+ bool have_truncate = false;
snapid_t src_snapid = (uint64_t)op.copy_from.snapid;
version_t src_version = op.copy_from.src_version;
+
+ if ((op.op == CEPH_OSD_OP_COPY_FROM2) &&
+ (op.copy_from.flags & ~CEPH_OSD_COPY_FROM_FLAGS)) {
+ dout(20) << "invalid copy-from2 flags 0x"
+ << std::hex << (int)op.copy_from.flags << std::dec << dendl;
+ result = -EINVAL;
+ break;
+ }
try {
decode(src_name, bp);
decode(src_oloc, bp);
+ // check if client sent us truncate_seq and truncate_size
+ if ((op.op == CEPH_OSD_OP_COPY_FROM2) &&
+ (op.copy_from.flags & CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ)) {
+ decode(truncate_seq, bp);
+ decode(truncate_size, bp);
+ have_truncate = true;
+ }
}
catch (buffer::error& e) {
result = -EINVAL;
break;
}
CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
+ if (have_truncate)
+ cb->set_truncate(truncate_seq, truncate_size);
ctx->op_finishers[ctx->current_osd_subop_num].reset(
new CopyFromFinisher(cb));
start_copy(cb, ctx->obc, src, src_oloc, src_version,
interval_set<uint64_t> ch;
ch.insert(0, oi.size);
ctx->modified_ranges.union_of(ch);
+ ctx->clean_regions.mark_data_region_dirty(0, oi.size);
}
+ ctx->clean_regions.mark_omap_dirty();
ctx->delta_stats.num_wr++;
if (soid.is_snap()) {
ceph_assert(ctx->obc->ssc->snapset.clone_overlap.count(soid.snap));
maybe_create_new_object(ctx, true);
ctx->delta_stats.num_bytes -= obs.oi.size;
ctx->delta_stats.num_bytes += rollback_to->obs.oi.size;
+ ctx->clean_regions.mark_data_region_dirty(0, std::max(obs.oi.size, rollback_to->obs.oi.size));
+ ctx->clean_regions.mark_omap_dirty();
obs.oi.size = rollback_to->obs.oi.size;
if (rollback_to->obs.oi.is_data_digest())
obs.oi.set_data_digest(rollback_to->obs.oi.data_digest);
if (snapc.seq > ctx->new_snapset.seq) {
// update snapset with latest snap context
ctx->new_snapset.seq = snapc.seq;
- ctx->new_snapset.snaps = snapc.snaps;
+ if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+ ctx->new_snapset.snaps = snapc.snaps;
+ } else {
+ ctx->new_snapset.snaps.clear();
+ }
}
dout(20) << "make_writeable " << soid
<< " done, snapset=" << ctx->new_snapset << dendl;
if (oi.has_manifest() && oi.manifest.is_chunked()) {
for (auto &p : oi.manifest.chunk_map) {
if ((p.first <= offset && p.first + p.second.length > offset) ||
- (p.first > offset && p.first <= offset + length)) {
+ (p.first > offset && p.first < offset + length)) {
p.second.clear_flag(chunk_info_t::FLAG_MISSING);
p.second.set_flag(chunk_info_t::FLAG_DIRTY);
}
p != ctx->notify_acks.end();
++p) {
if (p->watch_cookie)
- dout(10) << "notify_ack " << make_pair(p->watch_cookie.get(), p->notify_id) << dendl;
+ dout(10) << "notify_ack " << make_pair(*(p->watch_cookie), p->notify_id) << dendl;
else
dout(10) << "notify_ack " << make_pair("NULL", p->notify_id) << dendl;
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
++i) {
if (i->first.second != entity) continue;
if (p->watch_cookie &&
- p->watch_cookie.get() != i->first.first) continue;
+ *(p->watch_cookie) != i->first.first) continue;
dout(10) << "acking notify on watch " << i->first << dendl;
i->second->notify_ack(p->notify_id, p->reply_bl);
}
int result = do_osd_ops(ctx, *ctx->ops);
if (result < 0) {
if (ctx->op->may_write() &&
- get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+ get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
// need to save the error code in the pg log, to detect dup ops,
// but do nothing else
ctx->update_log_only = true;
if (ctx->pending_async_reads.empty())
unstable_stats.add(ctx->delta_stats);
if (ctx->op->may_write() &&
- get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+ get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
ctx->update_log_only = true;
}
return result;
// check for full
if ((ctx->delta_stats.num_bytes > 0 ||
ctx->delta_stats.num_objects > 0) && // FIXME: keys?
- (pool.info.has_flag(pg_pool_t::FLAG_FULL) ||
- get_osdmap()->test_flag(CEPH_OSDMAP_FULL))) {
- const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
+ pool.info.has_flag(pg_pool_t::FLAG_FULL)) {
+ auto m = ctx->op->get_req<MOSDOp>();
if (ctx->reqid.name.is_mds() || // FIXME: ignore MDS for now
m->has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
dout(20) << __func__ << " full, but proceeding due to FULL_FORCE or MDS"
finish_ctx(ctx,
ctx->new_obs.exists ? pg_log_entry_t::MODIFY :
- pg_log_entry_t::DELETE);
+ pg_log_entry_t::DELETE,
+ result);
return result;
}
-void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type)
+void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, int result)
{
const hobject_t& soid = ctx->obs->oi.soid;
dout(20) << __func__ << " " << soid << " " << ctx
}
// append to log
- ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
- ctx->obs->oi.version,
- ctx->user_at_version, ctx->reqid,
- ctx->mtime, 0));
+ ctx->log.push_back(
+ pg_log_entry_t(log_op_type, soid, ctx->at_version,
+ ctx->obs->oi.version,
+ ctx->user_at_version, ctx->reqid,
+ ctx->mtime,
+ (ctx->op && ctx->op->allows_returnvec()) ? result : 0));
+ if (ctx->op && ctx->op->allows_returnvec()) {
+ // also the per-op values
+ ctx->log.back().set_op_returns(*ctx->ops);
+ dout(20) << __func__ << " op_returns " << ctx->log.back().op_returns
+ << dendl;
+ }
+
+ ctx->log.back().clean_regions = ctx->clean_regions;
+ dout(20) << __func__ << " object " << soid << " marks clean_regions " << ctx->log.back().clean_regions << dendl;
+
if (soid.snap < CEPH_NOSNAP) {
switch (log_op_type) {
case pg_log_entry_t::MODIFY:
const hobject_t &soid,
const object_stat_sum_t &delta_stats) {
- info.stats.stats.add(delta_stats);
- info.stats.stats.floor(0);
-
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ recovery_state.apply_op_stats(soid, delta_stats);
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
pg_shard_t bt = *i;
- pg_info_t& pinfo = peer_info[bt];
- if (soid <= pinfo.last_backfill)
- pinfo.stats.stats.add(delta_stats);
- else if (soid <= last_backfill_started)
+ const pg_info_t& pinfo = recovery_state.get_peer_info(bt);
+ if (soid > pinfo.last_backfill && soid <= last_backfill_started) {
pending_backfill_updates[soid].stats.add(delta_stats);
+ }
}
if (is_primary() && scrubber.active) {
void PrimaryLogPG::complete_read_ctx(int result, OpContext *ctx)
{
- const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
+ auto m = ctx->op->get_req<MOSDOp>();
ceph_assert(ctx->async_reads_complete());
for (vector<OSDOp>::iterator p = ctx->ops->begin();
}
ctx->bytes_read += p->outdata.length();
}
- ctx->reply->claim_op_out_data(*ctx->ops);
ctx->reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
MOSDOpReply *reply = ctx->reply;
void finish(int r) override {
if (r == -ECANCELED)
return;
- pg->lock();
+ std::scoped_lock l{*pg};
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->process_copy_chunk(oid, tid, r);
cop.reset();
}
- pg->unlock();
}
};
void finish(int r) override {
if (r == -ECANCELED)
return;
- pg->lock();
+ std::scoped_lock l{*pg};
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->process_copy_chunk_manifest(oid, tid, r, offset);
cop.reset();
}
- pg->unlock();
}
};
if (cursor.is_complete()) {
// include reqids only in the final step. this is a bit fragile
// but it works...
- pg_log.get_log().get_object_reqids(ctx->obc->obs.oi.soid, 10,
+ recovery_state.get_pg_log().get_log().get_object_reqids(ctx->obc->obs.oi.soid, 10,
&reply_obj.reqids,
&reply_obj.reqid_return_codes);
dout(20) << " got reqids" << dendl;
void PrimaryLogPG::fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
OSDOp& osd_op)
{
- // NOTE: we take non-const ref here for claim_op_out_data below; we must
- // be careful not to modify anything else that will upset a racing
- // operator<<
- MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
+ const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
uint64_t features = m->get_features();
object_copy_data_t reply_obj;
- pg_log.get_log().get_object_reqids(oid, 10, &reply_obj.reqids,
+ recovery_state.get_pg_log().get_log().get_object_reqids(oid, 10, &reply_obj.reqids,
&reply_obj.reqid_return_codes);
dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl;
encode(reply_obj, osd_op.outdata, features);
osd_op.rval = -ENOENT;
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap_epoch(), 0, false);
- reply->claim_op_out_data(m->ops);
reply->set_result(-ENOENT);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid,
get_last_peering_reset(), cop);
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
gather.set_finisher(new C_OnFinisher(fin,
- osd->objecter_finishers[n]));
+ osd->get_objecter_finisher(get_pg_shard())));
ceph_tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
cop->src.snap, NULL,
C_CopyChunk *fin = new C_CopyChunk(this, obc->obs.oi.soid,
get_last_peering_reset(), cop);
fin->offset = obj_offset;
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
-
- ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, op,
- sub_cop->src.snap, NULL,
- flags,
- new C_OnFinisher(fin, osd->objecter_finishers[n]),
- // discover the object version if we don't know it yet
- sub_cop->results.user_version ? NULL : &sub_cop->results.user_version);
+
+ ceph_tid_t tid = osd->objecter->read(
+ soid.oid, oloc, op,
+ sub_cop->src.snap, NULL,
+ flags,
+ new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())),
+ // discover the object version if we don't know it yet
+ sub_cop->results.user_version ? NULL : &sub_cop->results.user_version);
fin->tid = tid;
sub_cop->objecter_tid = tid;
if (last_offset < iter->first) {
// verify snap hasn't been deleted
vector<snapid_t>::iterator p = cop->results.snaps.begin();
while (p != cop->results.snaps.end()) {
- if (pool.info.is_removed_snap(*p)) {
+ // make best effort to sanitize snaps/clones.
+ if (get_osdmap()->in_removed_snaps_queue(info.pgid.pgid.pool(), *p)) {
dout(10) << __func__ << " clone snap " << *p << " has been deleted"
<< dendl;
for (vector<snapid_t>::iterator q = p + 1;
p.second->cursor.data_offset, sub_chunk.outdata.length());
obs.oi.manifest.chunk_map[p.second->cursor.data_offset].clear_flag(chunk_info_t::FLAG_DIRTY);
obs.oi.manifest.chunk_map[p.second->cursor.data_offset].clear_flag(chunk_info_t::FLAG_MISSING);
+ ctx->clean_regions.mark_data_region_dirty(p.second->cursor.data_offset, sub_chunk.outdata.length());
sub_chunk.outdata.clear();
}
obs.oi.clear_data_digest();
obs.oi.clear_omap_digest();
}
- obs.oi.truncate_seq = cb->results->truncate_seq;
- obs.oi.truncate_size = cb->results->truncate_size;
+ obs.oi.truncate_seq = cb->truncate_seq;
+ obs.oi.truncate_size = cb->truncate_size;
+
+ obs.oi.mtime = ceph::real_clock::to_timespec(cb->results->mtime);
+ ctx->mtime = utime_t();
ctx->extra_reqids = cb->results->reqids;
ctx->extra_reqid_return_codes = cb->results->reqid_return_codes;
if (cb->results->has_omap) {
dout(10) << __func__ << " setting omap flag on " << obs.oi.soid << dendl;
obs.oi.set_flag(object_info_t::FLAG_OMAP);
+ ctx->clean_regions.mark_omap_dirty();
} else {
dout(10) << __func__ << " clearing omap flag on " << obs.oi.soid << dendl;
obs.oi.clear_flag(object_info_t::FLAG_OMAP);
if (obs.oi.size > 0)
ch.insert(0, obs.oi.size);
ctx->modified_ranges.union_of(ch);
+ ctx->clean_regions.mark_data_region_dirty(0, std::max(obs.oi.size, cb->get_data_size()));
if (cb->get_data_size() != obs.oi.size) {
ctx->delta_stats.num_bytes -= obs.oi.size;
if (r != -ENOENT && soid.is_snap()) {
if (results->snaps.empty()) {
- // we must have read "snap" content from the head object in
- // the base pool. use snap_seq to construct what snaps should
- // be for this clone (what is was before we evicted the clean
- // clone from this pool, and what it will be when we flush and
- // the clone eventually happens in the base pool).
+ // we must have read "snap" content from the head object in the
+ // base pool. use snap_seq to construct what snaps should be
+ // for this clone (what is was before we evicted the clean clone
+ // from this pool, and what it will be when we flush and the
+ // clone eventually happens in the base pool). we want to use
+ // snaps in (results->snap_seq,soid.snap]
SnapSet& snapset = obc->ssc->snapset;
- vector<snapid_t>::iterator p = snapset.snaps.begin();
- while (p != snapset.snaps.end() && *p > soid.snap)
- ++p;
- while (p != snapset.snaps.end() && *p > results->snap_seq) {
- results->snaps.push_back(*p);
- ++p;
+ for (auto p = snapset.clone_snaps.rbegin();
+ p != snapset.clone_snaps.rend();
+ ++p) {
+ for (auto snap : p->second) {
+ if (snap > soid.snap) {
+ continue;
+ }
+ if (snap <= results->snap_seq) {
+ break;
+ }
+ results->snaps.push_back(snap);
+ }
}
}
OpContextUPtr tctx = simple_opc_create(obc);
tctx->at_version = get_next_version();
- filter_snapc(tctx->new_snapset.snaps);
+ if (get_osdmap()->require_osd_release < ceph_release_t::octopus) {
+ filter_snapc(tctx->new_snapset.snaps);
+ } else {
+ tctx->new_snapset.snaps.clear();
+ }
vector<snapid_t> new_clones;
map<snapid_t, vector<snapid_t>> new_clone_snaps;
for (vector<snapid_t>::iterator i = tctx->new_snapset.clones.begin();
}
tctx->new_obs.oi.size = results->object_size;
tctx->new_obs.oi.user_version = results->user_version;
+ tctx->new_obs.oi.mtime = ceph::real_clock::to_timespec(results->mtime);
+ tctx->mtime = utime_t();
if (results->is_data_digest()) {
tctx->new_obs.oi.set_data_digest(results->data_digest);
} else {
tctx->new_obs.oi.clear_data_digest();
}
+ if (results->object_size)
+ tctx->clean_regions.mark_data_region_dirty(0, results->object_size);
if (results->is_omap_digest()) {
tctx->new_obs.oi.set_omap_digest(results->omap_digest);
} else {
tctx->new_obs.oi.clear_omap_digest();
}
+ if (results->has_omap)
+ tctx->clean_regions.mark_omap_dirty();
tctx->new_obs.oi.truncate_seq = results->truncate_seq;
tctx->new_obs.oi.truncate_size = results->truncate_size;
ceph_assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
tctx->new_snapset.from_snap_set(
results->snapset,
- get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
+ get_osdmap()->require_osd_release < ceph_release_t::luminous);
}
dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
void finish(int r) override {
if (r == -ECANCELED)
return;
- pg->lock();
+ std::scoped_lock locker{*pg};
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_flush(oid, tid, r);
pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start);
}
- pg->unlock();
}
};
int PrimaryLogPG::start_flush(
OpRequestRef op, ObjectContextRef obc,
bool blocking, hobject_t *pmissing,
- boost::optional<std::function<void()>> &&on_flush)
+ std::optional<std::function<void()>> &&on_flush)
{
const object_info_t& oi = obc->obs.oi;
const hobject_t& soid = oi.soid;
<< " " << (blocking ? "blocking" : "non-blocking/best-effort")
<< dendl;
- // get a filtered snapset, need to remove removed snaps
- SnapSet snapset = obc->ssc->snapset.get_filtered(pool.info);
+ bool preoctopus_compat =
+ get_osdmap()->require_osd_release < ceph_release_t::octopus;
+ SnapSet snapset;
+ if (preoctopus_compat) {
+ // for pre-octopus compatibility, filter SnapSet::snaps. not
+ // certain we need this, but let's be conservative.
+ snapset = obc->ssc->snapset.get_filtered(pool.info);
+ } else {
+ // NOTE: change this to a const ref when we remove this compat code
+ snapset = obc->ssc->snapset;
+ }
// verify there are no (older) check for dirty clones
{
hobject_t next = soid;
next.snap = *p;
ceph_assert(next.snap < soid.snap);
- if (pg_log.get_missing().is_missing(next)) {
+ if (recovery_state.get_pg_log().get_missing().is_missing(next)) {
dout(10) << __func__ << " missing clone is " << next << dendl;
if (pmissing)
*pmissing = next;
SnapContext snapc, dsnapc;
if (snapset.seq != 0) {
if (soid.snap == CEPH_NOSNAP) {
- snapc.seq = snapset.seq;
- snapc.snaps = snapset.snaps;
+ snapc = snapset.get_ssc_as_of(snapset.seq);
} else {
snapid_t min_included_snap;
auto p = snapset.clone_snaps.find(soid.snap);
}
C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
ceph_tid_t tid = osd->objecter->mutate(
soid.oid, base_oloc, o, snapc,
ceph::real_clock::from_ceph_timespec(oi.mtime),
CEPH_OSD_FLAG_IGNORE_OVERLAY | CEPH_OSD_FLAG_ENFORCE_SNAPC,
new C_OnFinisher(fin,
- osd->objecter_finishers[n]));
+ osd->get_objecter_finisher(get_pg_shard())));
/* we're under the pg lock and fin->finish() is grabbing that */
fin->tid = tid;
fop->objecter_tid = tid;
flush_ops[soid] = fop;
- info.stats.stats.sum.num_flush++;
- info.stats.stats.sum.num_flush_kb += shift_round_up(oi.size, 10);
+
+ recovery_state.update_stats(
+ [&oi](auto &history, auto &stats) {
+ stats.stats.sum.num_flush++;
+ stats.stats.sum.num_flush_kb += shift_round_up(oi.size, 10);
+ return false;
+ });
return -EINPROGRESS;
}
}
if (fop->on_flush) {
(*(fop->on_flush))();
- fop->on_flush = boost::none;
+ fop->on_flush = std::nullopt;
}
flush_ops.erase(oid);
return;
}
if (fop->on_flush) {
(*(fop->on_flush))();
- fop->on_flush = boost::none;
+ fop->on_flush = std::nullopt;
}
flush_ops.erase(oid);
if (fop->blocking)
osd->logger->inc(l_osd_tier_clean);
if (fop->on_flush) {
(*(fop->on_flush))();
- fop->on_flush = boost::none;
+ fop->on_flush = std::nullopt;
}
flush_ops.erase(oid);
return 0;
// try to take the lock manually, since we don't
// have a ctx yet.
if (ctx->lock_manager.get_lock_type(
- ObjectContext::RWState::RWWRITE,
+ RWState::RWWRITE,
oid,
obc,
fop->op)) {
// fop->op is now waiting on the lock; get fop->dup_ops to wait too.
for (auto op : fop->dup_ops) {
bool locked = ctx->lock_manager.get_lock_type(
- ObjectContext::RWState::RWWRITE,
+ RWState::RWWRITE,
oid,
obc,
op);
if (fop->on_flush) {
ctx->register_on_finish(*(fop->on_flush));
- fop->on_flush = boost::none;
+ fop->on_flush = std::nullopt;
}
ctx->at_version = get_next_version();
t->omap_clear(oid);
ctx->new_obs.oi.clear_omap_digest();
ctx->new_obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+ ctx->clean_regions.mark_omap_dirty();
}
if (obc->obs.oi.size == chunks_size) {
t->truncate(oid, 0);
truncate_update_size_and_usage(ctx->delta_stats,
ctx->new_obs.oi,
0);
+ ctx->clean_regions.mark_data_region_dirty(0, ctx->new_obs.oi.size);
ctx->new_obs.oi.new_object();
for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
p.second.clear_flag(chunk_info_t::FLAG_DIRTY);
}
if (fop->on_flush) {
(*(fop->on_flush))();
- fop->on_flush = boost::none;
+ fop->on_flush = std::nullopt;
}
flush_ops.erase(fop->obc->obs.oi.soid);
}
repop->all_committed = true;
if (!repop->rep_aborted) {
if (repop->v != eversion_t()) {
- last_update_ondisk = repop->v;
- last_complete_ondisk = repop->pg_local_last_complete;
+ recovery_state.complete_write(repop->v, repop->pg_local_last_complete);
}
eval_repop(repop);
}
dout(10) << "op_applied version " << applied_version << dendl;
ceph_assert(applied_version != eversion_t());
ceph_assert(applied_version <= info.last_update);
- last_update_applied = applied_version;
+ recovery_state.local_write_applied(applied_version);
if (is_primary()) {
if (scrubber.active) {
- if (last_update_applied >= scrubber.subset_last_update) {
+ if (recovery_state.get_last_update_applied() >=
+ scrubber.subset_last_update) {
requeue_scrub(ops_blocked_by_scrub());
}
} else {
void PrimaryLogPG::eval_repop(RepGather *repop)
{
- const MOSDOp *m = NULL;
- if (repop->op)
- m = static_cast<const MOSDOp *>(repop->op->get_req());
-
- if (m)
- dout(10) << "eval_repop " << *repop << dendl;
- else
- dout(10) << "eval_repop " << *repop << " (no op)" << dendl;
+ dout(10) << "eval_repop " << *repop
+ << (repop->op && repop->op->get_req<MOSDOp>() ? "" : " (no op)") << dendl;
// ondisk?
if (repop->all_committed) {
return_code = std::get<2>(i);
}
osd->reply_op_error(std::get<0>(i), return_code, repop->v,
- std::get<1>(i));
+ std::get<1>(i), std::get<3>(i));
}
waiting_for_ondisk.erase(it);
}
publish_stats_to_osd();
- calc_min_last_complete_ondisk();
dout(10) << " removing " << *repop << dendl;
ceph_assert(!repop_queue.empty());
<< dendl;
repop->v = ctx->at_version;
- if (ctx->at_version > eversion_t()) {
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == get_primary()) continue;
- pg_info_t &pinfo = peer_info[*i];
- // keep peer_info up to date
- if (pinfo.last_complete == pinfo.last_update)
- pinfo.last_complete = ctx->at_version;
- pinfo.last_update = ctx->at_version;
- }
- }
ctx->op_t->add_obc(ctx->obc);
if (ctx->clone_obc) {
projected_log.add(entry);
}
- bool requires_missing_loc = false;
- for (set<pg_shard_t>::iterator i = async_recovery_targets.begin();
- i != async_recovery_targets.end();
- ++i) {
- if (*i == get_primary() || !peer_missing[*i].is_missing(soid)) continue;
- requires_missing_loc = true;
- for (auto &&entry: ctx->log) {
- peer_missing[*i].add_next_event(entry);
- }
- }
-
- if (requires_missing_loc) {
- for (auto &&entry: ctx->log) {
- dout(30) << __func__ << " missing_loc before: "
- << missing_loc.get_locations(entry.soid) << dendl;
- missing_loc.add_missing(entry.soid, entry.version,
- eversion_t(), entry.is_delete());
- // clear out missing_loc
- missing_loc.clear_location(entry.soid);
- for (auto &i: actingset) {
- if (!peer_missing[i].is_missing(entry.soid))
- missing_loc.add_location(entry.soid, i);
- }
- dout(30) << __func__ << " missing_loc after: "
- << missing_loc.get_locations(entry.soid) << dendl;
- }
- }
-
+ recovery_state.pre_submit_op(
+ soid,
+ ctx->log,
+ ctx->at_version);
pgbackend->submit_transaction(
soid,
ctx->delta_stats,
ctx->at_version,
std::move(ctx->op_t),
- pg_trim_to,
- min_last_complete_ondisk,
+ recovery_state.get_pg_trim_to(),
+ recovery_state.get_min_last_complete_ondisk(),
ctx->log,
ctx->updated_hset_history,
on_all_commit,
int r,
ObcLockManager &&manager,
OpRequestRef &&op,
- boost::optional<std::function<void(void)> > &&on_complete)
+ std::optional<std::function<void(void)> > &&on_complete)
{
RepGather *repop = new RepGather(
std::move(manager),
dout(20) << __func__ << " " << repop << dendl;
issue_repop(repop, ctx.get());
eval_repop(repop);
- if (hard_limit_pglog())
- calc_trim_to_aggressive();
- else
- calc_trim_to();
+ recovery_state.update_trim_to();
repop->put();
}
void PrimaryLogPG::submit_log_entries(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
- boost::optional<std::function<void(void)> > &&_on_complete,
+ std::optional<std::function<void(void)> > &&_on_complete,
OpRequestRef op,
int r)
{
}
boost::intrusive_ptr<RepGather> repop;
- boost::optional<std::function<void(void)> > on_complete;
- if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
+ std::optional<std::function<void(void)> > on_complete;
+ if (get_osdmap()->require_osd_release >= ceph_release_t::jewel) {
repop = new_repop(
version,
r,
[this, entries, repop, on_complete]() {
ObjectStore::Transaction t;
eversion_t old_last_update = info.last_update;
- merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
-
+ recovery_state.merge_new_log_entries(
+ entries, t, recovery_state.get_pg_trim_to(),
+ recovery_state.get_min_last_complete_ondisk());
set<pg_shard_t> waiting_on;
- for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
+ for (set<pg_shard_t>::const_iterator i = get_acting_recovery_backfill().begin();
+ i != get_acting_recovery_backfill().end();
++i) {
pg_shard_t peer(*i);
if (peer == pg_whoami) continue;
- ceph_assert(peer_missing.count(peer));
- ceph_assert(peer_info.count(peer));
- if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
+ ceph_assert(recovery_state.get_peer_missing().count(peer));
+ ceph_assert(recovery_state.has_peer_info(peer));
+ if (get_osdmap()->require_osd_release >= ceph_release_t::jewel) {
ceph_assert(repop);
MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
entries,
spg_t(info.pgid.pgid, i->shard),
pg_whoami.shard,
get_osdmap_epoch(),
- last_peering_reset,
+ get_last_peering_reset(),
repop->rep_tid,
- pg_trim_to,
- min_last_complete_ondisk);
+ recovery_state.get_pg_trim_to(),
+ recovery_state.get_min_last_complete_ondisk());
osd->send_message_osd_cluster(
peer.osd, m, get_osdmap_epoch());
waiting_on.insert(peer);
MOSDPGLog *m = new MOSDPGLog(
peer.shard, pg_whoami.shard,
info.last_update.epoch,
- info, last_peering_reset);
+ info, get_last_peering_reset());
m->log.log = entries;
m->log.tail = old_last_update;
m->log.head = info.last_update;
epoch_t epoch)
: pg(pg), rep_tid(rep_tid), epoch(epoch) {}
void finish(int) override {
- pg->lock();
+ std::scoped_lock l{*pg};
if (!pg->pg_has_reset_since(epoch)) {
auto it = pg->log_entry_update_waiting_on.find(rep_tid);
ceph_assert(it != pg->log_entry_update_waiting_on.end());
pg->log_entry_update_waiting_on.erase(it);
}
}
- pg->unlock();
}
};
t.register_on_commit(
op_applied(info.last_update);
});
- if (hard_limit_pglog())
- calc_trim_to_aggressive();
- else
- calc_trim_to();
+ recovery_state.update_trim_to();
}
void PrimaryLogPG::cancel_log_updates()
void PrimaryLogPG::get_watchers(list<obj_watch_item_t> *ls)
{
- lock();
+ std::scoped_lock l{*this};
pair<hobject_t, ObjectContextRef> i;
while (object_contexts.get_next(i.first, &i)) {
ObjectContextRef obc(i.second);
get_obc_watchers(obc, *ls);
}
- unlock();
}
void PrimaryLogPG::get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers)
void PrimaryLogPG::populate_obc_watchers(ObjectContextRef obc)
{
ceph_assert(is_active());
- auto it_objects = pg_log.get_log().objects.find(obc->obs.oi.soid);
+ auto it_objects = recovery_state.get_pg_log().get_log().objects.find(obc->obs.oi.soid);
ceph_assert((recovering.count(obc->obs.oi.soid) ||
!is_missing_object(obc->obs.oi.soid)) ||
- (it_objects != pg_log.get_log().objects.end() && // or this is a revert... see recover_primary()
+ (it_objects != recovery_state.get_pg_log().get_log().objects.end() && // or this is a revert... see recover_primary()
it_objects->second->op ==
pg_log_entry_t::LOST_REVERT &&
it_objects->second->reverting_to ==
bool can_create,
const map<string, bufferlist> *attrs)
{
- auto it_objects = pg_log.get_log().objects.find(soid);
+ auto it_objects = recovery_state.get_pg_log().get_log().objects.find(soid);
ceph_assert(
- attrs || !pg_log.get_missing().is_missing(soid) ||
+ attrs || !recovery_state.get_pg_log().get_missing().is_missing(soid) ||
// or this is a revert... see recover_primary()
- (it_objects != pg_log.get_log().objects.end() &&
+ (it_objects != recovery_state.get_pg_log().get_log().objects.end() &&
it_objects->second->op ==
pg_log_entry_t::LOST_REVERT));
ObjectContextRef obc = object_contexts.lookup(soid);
return 0;
}
- hobject_t head = oid.get_head();
-
// we want a snap
- if (!map_snapid_to_clone && pool.info.is_removed_snap(oid.snap)) {
- dout(10) << __func__ << " snap " << oid.snap << " is removed" << dendl;
- return -ENOENT;
- }
+ hobject_t head = oid.get_head();
SnapSetContext *ssc = get_snapset_context(oid, can_create);
if (!ssc || !(ssc->exists || can_create)) {
dout(20) << __func__ << " " << oid << " no snapset" << dendl;
<< " snapset " << ssc->snapset
<< " maps to " << oid << dendl;
- if (pg_log.get_missing().is_missing(oid)) {
+ if (recovery_state.get_pg_log().get_missing().is_missing(oid)) {
dout(10) << __func__ << " " << oid << " @" << oid.snap
<< " snapset " << ssc->snapset
<< " " << oid << " is missing" << dendl;
hobject_t soid(oid.oid, oid.get_key(), ssc->snapset.clones[k], oid.get_hash(),
info.pgid.pool(), oid.get_namespace());
- if (pg_log.get_missing().is_missing(soid)) {
+ if (recovery_state.get_pg_log().get_missing().is_missing(soid)) {
dout(20) << __func__ << " " << soid << " missing, try again later"
<< dendl;
if (pmissing)
if (pmissing)
*pmissing = soid;
put_snapset_context(ssc);
- if (is_degraded_or_backfilling_object(soid)) {
- dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
- return -EAGAIN;
- } else if (is_degraded_on_async_recovery_target(soid)) {
- dout(20) << __func__ << " clone is recovering " << soid << dendl;
- return -EAGAIN;
+ if (is_primary()) {
+ if (is_degraded_or_backfilling_object(soid)) {
+ dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
+ return -EAGAIN;
+ } else if (is_degraded_on_async_recovery_target(soid)) {
+ dout(20) << __func__ << " clone is recovering " << soid << dendl;
+ return -EAGAIN;
+ } else {
+ dout(20) << __func__ << " missing clone " << soid << dendl;
+ return -ENOENT;
+ }
} else {
- dout(20) << __func__ << " missing clone " << soid << dendl;
+ dout(20) << __func__ << " replica missing clone" << soid << dendl;
return -ENOENT;
}
}
ceph_assert(!cct->_conf->osd_debug_verify_snaps);
return -ENOENT;
}
- first = p->second.back();
- last = p->second.front();
- if (first <= oid.snap) {
- dout(20) << __func__ << " " << soid << " [" << first << "," << last
- << "] contains " << oid.snap << " -- HIT " << obc->obs << dendl;
- *pobc = obc;
- return 0;
- } else {
- dout(20) << __func__ << " " << soid << " [" << first << "," << last
- << "] does not contain " << oid.snap << " -- DNE" << dendl;
+ if (std::find(p->second.begin(), p->second.end(), oid.snap) ==
+ p->second.end()) {
+ dout(20) << __func__ << " " << soid << " clone_snaps " << p->second
+ << " does not contain " << oid.snap << " -- DNE" << dendl;
+ return -ENOENT;
+ }
+ if (get_osdmap()->in_removed_snaps_queue(info.pgid.pgid.pool(), oid.snap)) {
+ dout(20) << __func__ << " " << soid << " snap " << oid.snap
+ << " in removed_snaps_queue" << " -- DNE" << dendl;
return -ENOENT;
}
+ dout(20) << __func__ << " " << soid << " clone_snaps " << p->second
+ << " contains " << oid.snap << " -- HIT " << obc->obs << dendl;
+ *pobc = obc;
+ return 0;
}
void PrimaryLogPG::object_context_destructor_callback(ObjectContext *obc)
int priority,
PGBackend::RecoveryHandle *h)
{
- if (missing_loc.is_unfound(soid)) {
+ if (recovery_state.get_missing_loc().is_unfound(soid)) {
dout(7) << __func__ << " " << soid
<< " v " << v
<< " but it is unfound" << dendl;
return PULL_NONE;
}
- if (missing_loc.is_deleted(soid)) {
+ if (recovery_state.get_missing_loc().is_deleted(soid)) {
start_recovery_op(soid);
ceph_assert(!recovering.count(soid));
recovering.insert(make_pair(soid, ObjectContextRef()));
epoch_t cur_epoch = get_osdmap_epoch();
- remove_missing_object(soid, v, new FunctionContext(
+ remove_missing_object(soid, v, new LambdaContext(
[=](int) {
- lock();
+ std::scoped_lock locker{*this};
if (!pg_has_reset_since(cur_epoch)) {
bool object_missing = false;
- for (const auto& shard : acting_recovery_backfill) {
+ for (const auto& shard : get_acting_recovery_backfill()) {
if (shard == pg_whoami)
continue;
- if (peer_missing[shard].is_missing(soid)) {
+ if (recovery_state.get_peer_missing(shard).is_missing(soid)) {
dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
object_missing = true;
break;
pgbackend->run_recovery_op(recovery_handle, priority);
}
}
- unlock();
}));
return PULL_YES;
}
if (soid.snap && soid.snap < CEPH_NOSNAP) {
// do we have the head?
hobject_t head = soid.get_head();
- if (pg_log.get_missing().is_missing(head)) {
+ if (recovery_state.get_pg_log().get_missing().is_missing(head)) {
if (recovering.count(head)) {
dout(10) << " missing but already recovering head " << head << dendl;
return PULL_NONE;
} else {
int r = recover_missing(
- head, pg_log.get_missing().get_items().find(head)->second.need, priority,
+ head, recovery_state.get_pg_log().get_missing().get_items().find(head)->second.need, priority,
h);
if (r != PULL_NONE)
return PULL_HEAD;
recovery_info.version = v;
epoch_t cur_epoch = get_osdmap_epoch();
- t.register_on_complete(new FunctionContext(
+ t.register_on_complete(new LambdaContext(
[=](int) {
- lock();
+ std::unique_lock locker{*this};
if (!pg_has_reset_since(cur_epoch)) {
ObjectStore::Transaction t2;
on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
t2.register_on_complete(on_complete);
int r = osd->store->queue_transaction(ch, std::move(t2), nullptr);
ceph_assert(r == 0);
- unlock();
+ locker.unlock();
} else {
- unlock();
+ locker.unlock();
on_complete->complete(-EAGAIN);
}
}));
void PrimaryLogPG::_committed_pushed_object(
epoch_t epoch, eversion_t last_complete)
{
- lock();
+ std::scoped_lock locker{*this};
if (!pg_has_reset_since(epoch)) {
- dout(10) << __func__ << " last_complete " << last_complete << " now ondisk" << dendl;
- last_complete_ondisk = last_complete;
-
- if (last_complete_ondisk == info.last_update) {
- if (!is_primary()) {
- // Either we are a replica or backfill target.
- // we are fully up to date. tell the primary!
- osd->send_message_osd_cluster(
- get_primary().osd,
- new MOSDPGTrim(
- get_osdmap_epoch(),
- spg_t(info.pgid.pgid, get_primary().shard),
- last_complete_ondisk),
- get_osdmap_epoch());
- } else {
- calc_min_last_complete_ondisk();
- }
- }
-
+ recovery_state.recovery_committed_to(last_complete);
} else {
- dout(10) << __func__ << " pg has changed, not touching last_complete_ondisk" << dendl;
+ dout(10) << __func__
+ << " pg has changed, not touching last_complete_ondisk" << dendl;
}
-
- unlock();
}
void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
--active_pushes;
// requeue an active chunky scrub waiting on recovery ops
- if (!deleting && active_pushes == 0
+ if (!recovery_state.is_deleting() && active_pushes == 0
&& scrubber.is_chunky_scrub_active()) {
requeue_scrub(ops_blocked_by_scrub());
}
--active_pushes;
// requeue an active chunky scrub waiting on recovery ops
- if (!deleting && active_pushes == 0 &&
+ if (!recovery_state.is_deleting() && active_pushes == 0 &&
scrubber.active_rep_scrub && static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->chunky) {
auto& op = scrubber.active_rep_scrub;
osd->enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
op->get_req()->get_cost(),
op->get_req()->get_priority(),
op->get_req()->get_recv_stamp(),
}
}
-void PrimaryLogPG::recover_got(hobject_t oid, eversion_t v)
-{
- dout(10) << "got missing " << oid << " v " << v << dendl;
- pg_log.recover_got(oid, v, info);
- if (pg_log.get_log().log.empty()) {
- dout(10) << "last_complete now " << info.last_complete
- << " while log is empty" << dendl;
- } else if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) {
- dout(10) << "last_complete now " << info.last_complete
- << " log.complete_to " << pg_log.get_log().complete_to->version
- << dendl;
- } else {
- dout(10) << "last_complete now " << info.last_complete
- << " log.complete_to at end" << dendl;
- //below is not true in the repair case.
- //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong.
- ceph_assert(info.last_complete == info.last_update);
- }
-}
-
-void PrimaryLogPG::primary_failed(const hobject_t &soid)
-{
- list<pg_shard_t> fl = { pg_whoami };
- failed_push(fl, soid);
-}
-
-void PrimaryLogPG::failed_push(const list<pg_shard_t> &from,
- const hobject_t &soid, const eversion_t &need)
+void PrimaryLogPG::on_failed_pull(
+ const set<pg_shard_t> &from,
+ const hobject_t &soid,
+ const eversion_t &v)
{
dout(20) << __func__ << ": " << soid << dendl;
ceph_assert(recovering.count(soid));
}
recovering.erase(soid);
for (auto&& i : from) {
- missing_loc.remove_location(soid, i);
- if (need != eversion_t()) {
- dout(0) << __func__ << " adding " << soid << " to shard " << i
- << "'s missing set too" << dendl;
- auto pm = peer_missing.find(i);
- if (pm != peer_missing.end())
- pm->second.add(soid, need, eversion_t(), false);
+ if (i != pg_whoami) { // we'll get it below in primary_error
+ recovery_state.force_object_missing(i, soid, v);
}
}
+
dout(0) << __func__ << " " << soid << " from shard " << from
- << ", reps on " << missing_loc.get_locations(soid)
- << " unfound? " << missing_loc.is_unfound(soid) << dendl;
+ << ", reps on " << recovery_state.get_missing_loc().get_locations(soid)
+ << " unfound? " << recovery_state.get_missing_loc().is_unfound(soid)
+ << dendl;
finish_recovery_op(soid); // close out this attempt,
+ finish_degraded_object(soid);
+
+ if (from.count(pg_whoami)) {
+ dout(0) << " primary missing oid " << soid << " version " << v << dendl;
+ primary_error(soid, v);
+ backfills_in_flight.erase(soid);
+ }
}
eversion_t PrimaryLogPG::pick_newest_available(const hobject_t& oid)
{
eversion_t v;
pg_missing_item pmi;
- bool is_missing = pg_log.get_missing().is_missing(oid, &pmi);
+ bool is_missing = recovery_state.get_pg_log().get_missing().is_missing(oid, &pmi);
ceph_assert(is_missing);
v = pmi.have;
dout(10) << "pick_newest_available " << oid << " " << v << " on osd." << osd->whoami << " (local)" << dendl;
- ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
+ ceph_assert(!get_acting_recovery_backfill().empty());
+ for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
+ i != get_acting_recovery_backfill().end();
++i) {
if (*i == get_primary()) continue;
pg_shard_t peer = *i;
- if (!peer_missing[peer].is_missing(oid)) {
+ if (!recovery_state.get_peer_missing(peer).is_missing(oid)) {
continue;
}
- eversion_t h = peer_missing[peer].get_items().at(oid).have;
+ eversion_t h = recovery_state.get_peer_missing(peer).get_items().at(oid).have;
dout(10) << "pick_newest_available " << oid << " " << h << " on osd." << peer << dendl;
if (h > v)
v = h;
op->get_req());
ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
- boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ std::optional<eversion_t> op_trim_to, op_roll_forward_to;
if (m->pg_trim_to != eversion_t())
op_trim_to = m->pg_trim_to;
if (m->pg_roll_forward_to != eversion_t())
op_roll_forward_to = m->pg_roll_forward_to;
- dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+ dout(20) << __func__
+ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
- append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+ recovery_state.append_log_entries_update_missing(
+ m->entries, t, op_trim_to, op_roll_forward_to);
eversion_t new_lcod = info.last_complete;
- Context *complete = new FunctionContext(
+ Context *complete = new LambdaContext(
[=](int) {
const MOSDPGUpdateLogMissing *msg = static_cast<const MOSDPGUpdateLogMissing*>(
op->get_req());
- lock();
+ std::scoped_lock locker{*this};
if (!pg_has_reset_since(msg->get_epoch())) {
update_last_complete_ondisk(new_lcod);
MOSDPGUpdateLogMissingReply *reply =
reply->set_priority(CEPH_MSG_PRIO_HIGH);
msg->get_connection()->send_message(reply);
}
- unlock();
});
- if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+ if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
t.register_on_commit(complete);
} else {
/* Hack to work around the fact that ReplicatedBackend sends
*/
void PrimaryLogPG::mark_all_unfound_lost(
int what,
- ConnectionRef con,
- ceph_tid_t tid)
+ std::function<void(int,const std::string&,bufferlist&)> on_finish)
{
dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
list<hobject_t> oids;
dout(30) << __func__ << ": log before:\n";
- pg_log.get_log().print(*_dout);
+ recovery_state.get_pg_log().get_log().print(*_dout);
*_dout << dendl;
mempool::osd_pglog::list<pg_log_entry_t> log_entries;
utime_t mtime = ceph_clock_now();
map<hobject_t, pg_missing_item>::const_iterator m =
- missing_loc.get_needs_recovery().begin();
+ recovery_state.get_missing_loc().get_needs_recovery().begin();
map<hobject_t, pg_missing_item>::const_iterator mend =
- missing_loc.get_needs_recovery().end();
+ recovery_state.get_missing_loc().get_needs_recovery().end();
ObcLockManager manager;
eversion_t v = get_next_version();
v.epoch = get_osdmap_epoch();
- uint64_t num_unfound = missing_loc.num_unfound();
+ uint64_t num_unfound = recovery_state.get_missing_loc().num_unfound();
while (m != mend) {
const hobject_t &oid(m->first);
- if (!missing_loc.is_unfound(oid)) {
+ if (!recovery_state.get_missing_loc().is_unfound(oid)) {
// We only care about unfound objects
++m;
continue;
{
pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
0, osd_reqid_t(), mtime, 0);
- if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
+ if (get_osdmap()->require_osd_release >= ceph_release_t::jewel) {
if (pool.info.require_rollback()) {
e.mod_desc.try_rmobject(v.version);
} else {
}
}
- info.stats.stats_invalid = true;
+ recovery_state.update_stats(
+ [](auto &history, auto &stats) {
+ stats.stats_invalid = true;
+ return false;
+ });
submit_log_entries(
log_entries,
std::move(manager),
- boost::optional<std::function<void(void)> >(
- [this, oids, con, num_unfound, tid]() {
- if (perform_deletes_during_peering()) {
+ std::optional<std::function<void(void)> >(
+ [this, oids, num_unfound, on_finish]() {
+ if (recovery_state.perform_deletes_during_peering()) {
for (auto oid : oids) {
// clear old locations - merge_new_log_entries will have
// handled rebuilding missing_loc for each of these
// objects if we have the RECOVERY_DELETES flag
- missing_loc.recovered(oid);
+ recovery_state.object_recovered(oid, object_stat_sum_t());
}
}
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- DoRecovery())));
+ PeeringState::DoRecovery())));
} else if (is_backfill_unfound()) {
queue_peering_event(
PGPeeringEventRef(
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- RequestBackfill())));
+ PeeringState::RequestBackfill())));
} else {
queue_recovery();
}
string rs = ss.str();
dout(0) << "do_command r=" << 0 << " " << rs << dendl;
osd->clog->info() << rs;
- if (con) {
- MCommandReply *reply = new MCommandReply(0, rs);
- reply->set_tid(tid);
- con->send_message(reply);
- }
+ bufferlist empty;
+ on_finish(0, rs, empty);
}),
OpRequestRef());
}
void PrimaryLogPG::on_flushed()
{
- ceph_assert(flushes_in_progress > 0);
- flushes_in_progress--;
- if (flushes_in_progress == 0) {
- requeue_ops(waiting_for_flush);
- }
+ requeue_ops(waiting_for_flush);
if (!is_peered() || !is_primary()) {
pair<hobject_t, ObjectContextRef> i;
while (object_contexts.get_next(i.first, &i)) {
}
}
-void PrimaryLogPG::on_removal(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_removal(ObjectStore::Transaction &t)
{
dout(10) << __func__ << dendl;
- // adjust info to backfill
- info.set_last_backfill(hobject_t());
- pg_log.reset_backfill();
- dirty_info = true;
-
- // clear log
- PGLogEntryHandler rollbacker{this, t};
- pg_log.roll_forward(&rollbacker);
-
on_shutdown();
+
+ t.register_on_commit(new C_DeleteMore(this, get_osdmap_epoch()));
}
void PrimaryLogPG::clear_async_reads()
{
dout(10) << __func__ << dendl;
- // handles queue races
- deleting = true;
-
if (recovery_queued) {
recovery_queued = false;
osd->clear_queued_recovery(this);
cancel_copy_ops(false, &tids);
cancel_flush_ops(false, &tids);
cancel_proxy_ops(false, &tids);
+ cancel_manifest_ops(false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
apply_and_flush_repops(false);
}
}
-void PrimaryLogPG::on_activate()
+void PrimaryLogPG::on_activate_complete()
{
+ check_local();
+ // waiters
+ if (!recovery_state.needs_flush()) {
+ requeue_ops(waiting_for_peered);
+ } else if (!waiting_for_peered.empty()) {
+ dout(10) << __func__ << " flushes in progress, moving "
+ << waiting_for_peered.size()
+ << " items to waiting_for_flush"
+ << dendl;
+ ceph_assert(waiting_for_flush.empty());
+ waiting_for_flush.swap(waiting_for_peered);
+ }
+
+
// all clean?
if (needs_recovery()) {
dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- DoRecovery())));
+ PeeringState::DoRecovery())));
} else if (needs_backfill()) {
dout(10) << "activate queueing backfill" << dendl;
queue_peering_event(
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- RequestBackfill())));
+ PeeringState::RequestBackfill())));
} else {
dout(10) << "activate all replicas clean, no recovery" << dendl;
eio_errors_to_process = false;
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- AllReplicasRecovered())));
+ PeeringState::AllReplicasRecovered())));
}
publish_stats_to_osd();
- if (!backfill_targets.empty()) {
+ if (get_backfill_targets().size()) {
last_backfill_started = earliest_backfill();
new_backfill = true;
ceph_assert(!last_backfill_started.is_max());
- dout(5) << __func__ << ": bft=" << backfill_targets
+ dout(5) << __func__ << ": bft=" << get_backfill_targets()
<< " from " << last_backfill_started << dendl;
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
dout(5) << "target shard " << *i
- << " from " << peer_info[*i].last_backfill
+ << " from " << recovery_state.get_peer_info(*i).last_backfill
<< dendl;
}
}
agent_setup();
}
-void PrimaryLogPG::_on_new_interval()
-{
- dout(20) << __func__ << " checking missing set deletes flag. missing = " << pg_log.get_missing() << dendl;
- if (!pg_log.get_missing().may_include_deletes &&
- get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
- pg_log.rebuild_missing_set_with_deletes(osd->store, ch, info);
- }
- ceph_assert(pg_log.get_missing().may_include_deletes == get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES));
-}
-
-void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_change(ObjectStore::Transaction &t)
{
dout(10) << __func__ << dendl;
requeue_ops(waiting_for_peered);
requeue_ops(waiting_for_flush);
requeue_ops(waiting_for_active);
+ requeue_ops(waiting_for_readable);
clear_scrub_reserved();
cancel_copy_ops(is_primary(), &tids);
cancel_flush_ops(is_primary(), &tids);
cancel_proxy_ops(is_primary(), &tids);
+ cancel_manifest_ops(is_primary(), &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
// requeue object waiters
// registered watches.
context_registry_on_change();
- pgbackend->on_change_cleanup(t);
- scrubber.cleanup_store(t);
+ pgbackend->on_change_cleanup(&t);
+ scrubber.cleanup_store(&t);
pgbackend->on_change();
// clear snap_trimmer state
ceph_assert(objects_blocked_on_degraded_snap.empty());
}
-void PrimaryLogPG::on_role_change()
+void PrimaryLogPG::plpg_on_role_change()
{
dout(10) << __func__ << dendl;
if (get_role() != 0 && hit_set) {
}
}
-void PrimaryLogPG::on_pool_change()
+void PrimaryLogPG::plpg_on_pool_change()
{
dout(10) << __func__ << dendl;
// requeue cache full waiters just in case the cache_mode is
// clear state. called on recovery completion AND cancellation.
void PrimaryLogPG::_clear_recovery_state()
{
- missing_loc.clear();
#ifdef DEBUG_RECOVERY_OIDS
recovering_oids.clear();
#endif
waiting_for_unreadable_object.erase(soid);
}
if (is_missing_object(soid))
- pg_log.set_last_requested(0); // get recover_primary to start over
+ recovery_state.set_last_requested(0);
finish_degraded_object(soid);
}
void PrimaryLogPG::check_recovery_sources(const OSDMapRef& osdmap)
{
- /*
- * check that any peers we are planning to (or currently) pulling
- * objects from are dealt with.
- */
- missing_loc.check_recovery_sources(osdmap);
pgbackend->check_recovery_sources(osdmap);
-
- for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
- i != peer_log_requested.end();
- ) {
- if (!osdmap->is_up(i->osd)) {
- dout(10) << "peer_log_requested removing " << *i << dendl;
- peer_log_requested.erase(i++);
- } else {
- ++i;
- }
- }
-
- for (set<pg_shard_t>::iterator i = peer_missing_requested.begin();
- i != peer_missing_requested.end();
- ) {
- if (!osdmap->is_up(i->osd)) {
- dout(10) << "peer_missing_requested removing " << *i << dendl;
- peer_missing_requested.erase(i++);
- } else {
- ++i;
- }
- }
}
bool PrimaryLogPG::start_recovery_ops(
bool recovery_started = false;
ceph_assert(is_primary());
ceph_assert(is_peered());
- ceph_assert(!is_deleting());
+ ceph_assert(!recovery_state.is_deleting());
ceph_assert(recovery_queued);
recovery_queued = false;
return have_unfound();
}
- const auto &missing = pg_log.get_missing();
+ const auto &missing = recovery_state.get_pg_log().get_missing();
uint64_t num_unfound = get_num_unfound();
- if (!missing.have_missing()) {
- info.last_complete = info.last_update;
+ if (!recovery_state.have_missing()) {
+ recovery_state.local_recovery_complete();
}
if (!missing.have_missing() || // Primary does not have missing
- all_missing_unfound()) { // or all of the missing objects are unfound.
+ // or all of the missing objects are unfound.
+ recovery_state.all_missing_unfound()) {
// Recover the replicas.
started = recover_replicas(max, handle, &recovery_started);
}
bool deferred_backfill = false;
if (recovering.empty() &&
state_test(PG_STATE_BACKFILLING) &&
- !backfill_targets.empty() && started < max &&
+ !get_backfill_targets().empty() && started < max &&
missing.num_missing() == 0 &&
waiting_on_backfill.empty()) {
if (get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL)) {
!is_degraded()) {
dout(10) << "deferring backfill due to NOREBALANCE" << dendl;
deferred_backfill = true;
- } else if (!backfill_reserved) {
+ } else if (!recovery_state.is_backfill_reserved()) {
dout(10) << "deferring backfill due to !backfill_reserved" << dendl;
if (!backfill_reserving) {
dout(10) << "queueing RequestBackfill" << dendl;
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- RequestBackfill())));
+ PeeringState::RequestBackfill())));
}
deferred_backfill = true;
} else {
ceph_assert(recovery_ops_active == 0);
dout(10) << __func__ << " needs_recovery: "
- << missing_loc.get_needs_recovery()
+ << recovery_state.get_missing_loc().get_needs_recovery()
<< dendl;
dout(10) << __func__ << " missing_loc: "
- << missing_loc.get_missing_locs()
+ << recovery_state.get_missing_loc().get_missing_locs()
<< dendl;
int unfound = get_num_unfound();
if (unfound) {
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- RequestBackfill())));
+ PeeringState::RequestBackfill())));
} else {
dout(10) << "recovery done, no backfill" << dendl;
eio_errors_to_process = false;
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- AllReplicasRecovered())));
+ PeeringState::AllReplicasRecovered())));
}
} else { // backfilling
state_clear(PG_STATE_BACKFILLING);
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- Backfilled())));
+ PeeringState::Backfilled())));
}
return false;
{
ceph_assert(is_primary());
- const auto &missing = pg_log.get_missing();
+ const auto &missing = recovery_state.get_pg_log().get_missing();
dout(10) << __func__ << " recovering " << recovering.size()
<< " in pg,"
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
map<version_t, hobject_t>::const_iterator p =
- missing.get_rmissing().lower_bound(pg_log.get_log().last_requested);
+ missing.get_rmissing().lower_bound(recovery_state.get_pg_log().get_log().last_requested);
while (p != missing.get_rmissing().end()) {
handle.reset_tp_timeout();
hobject_t soid;
version_t v = p->first;
- auto it_objects = pg_log.get_log().objects.find(p->second);
- if (it_objects != pg_log.get_log().objects.end()) {
+ auto it_objects = recovery_state.get_pg_log().get_log().objects.find(p->second);
+ if (it_objects != recovery_state.get_pg_log().get_log().objects.end()) {
latest = it_objects->second;
ceph_assert(latest->is_update() || latest->is_delete());
soid = latest->soid;
ceph_assert(!pool.info.require_rollback());
t.setattr(coll, ghobject_t(soid), OI_ATTR, b2);
- recover_got(soid, latest->version);
- missing_loc.add_location(soid, pg_whoami);
+ recovery_state.recover_got(
+ soid,
+ latest->version,
+ false,
+ t);
++active_pushes;
eversion_t alternate_need = latest->reverting_to;
dout(10) << " need to pull prior_version " << alternate_need << " for revert " << item << dendl;
- for (map<pg_shard_t, pg_missing_t>::iterator p = peer_missing.begin();
- p != peer_missing.end();
- ++p)
+ set<pg_shard_t> good_peers;
+ for (auto p = recovery_state.get_peer_missing().begin();
+ p != recovery_state.get_peer_missing().end();
+ ++p) {
if (p->second.is_missing(soid, need) &&
p->second.get_items().at(soid).have == alternate_need) {
- missing_loc.add_location(soid, p->first);
+ good_peers.insert(p->first);
}
+ }
+ recovery_state.set_revert_with_targets(
+ soid,
+ good_peers);
dout(10) << " will pull " << alternate_need << " or " << need
- << " from one of " << missing_loc.get_locations(soid)
+ << " from one of "
+ << recovery_state.get_missing_loc().get_locations(soid)
<< dendl;
}
}
// only advance last_requested if we haven't skipped anything
if (!skipped)
- pg_log.set_last_requested(v);
+ recovery_state.set_last_requested(v);
}
pgbackend->run_recovery_op(h, get_recovery_op_priority());
bool PrimaryLogPG::primary_error(
const hobject_t& soid, eversion_t v)
{
- pg_log.missing_add(soid, v, eversion_t());
- pg_log.set_last_requested(0);
- missing_loc.remove_location(soid, pg_whoami);
- bool uhoh = true;
- ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- if (*i == get_primary()) continue;
- pg_shard_t peer = *i;
- if (!peer_missing[peer].is_missing(soid, v)) {
- missing_loc.add_location(soid, peer);
- dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
- << ", there should be a copy on shard " << peer << dendl;
- uhoh = false;
- }
- }
+ recovery_state.force_object_missing(pg_whoami, soid, v);
+ bool uhoh = recovery_state.get_missing_loc().is_unfound(soid);
if (uhoh)
- osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
+ osd->clog->error() << info.pgid << " missing primary copy of "
+ << soid << ", unfound";
else
- osd->clog->error() << info.pgid << " missing primary copy of " << soid
- << ", will try copies on " << missing_loc.get_locations(soid);
+ osd->clog->error() << info.pgid << " missing primary copy of "
+ << soid
+ << ", will try copies on "
+ << recovery_state.get_missing_loc().get_locations(soid);
return uhoh;
}
ceph_assert(is_primary());
dout(10) << __func__ << ": on " << soid << dendl;
+ if (soid.snap && soid.snap < CEPH_NOSNAP) {
+ // do we have the head and/or snapdir?
+ hobject_t head = soid.get_head();
+ if (recovery_state.get_pg_log().get_missing().is_missing(head)) {
+ if (recovering.count(head)) {
+ dout(10) << " missing but already recovering head " << head << dendl;
+ return 0;
+ } else {
+ int r = recover_missing(
+ head, recovery_state.get_pg_log().get_missing().get_items().find(head)->second.need,
+ get_recovery_op_priority(), h);
+ if (r != PULL_NONE)
+ return 1;
+ return 0;
+ }
+ }
+ }
+
// NOTE: we know we will get a valid oloc off of disk here.
ObjectContextRef obc = get_object_context(soid, false);
if (!obc) {
ceph_assert(!recovering.count(soid));
recovering.insert(make_pair(soid, obc));
- /* We need this in case there is an in progress write on the object. In fact,
- * the only possible write is an update to the xattr due to a lost_revert --
- * a client write would be blocked since the object is degraded.
- * In almost all cases, therefore, this lock should be uncontended.
- */
int r = pgbackend->recover_object(
soid,
v,
h);
if (r < 0) {
dout(0) << __func__ << " Error " << r << " on oid " << soid << dendl;
- primary_failed(soid);
- primary_error(soid, v);
+ on_failed_pull({ pg_whoami }, soid, v);
return 0;
}
return 1;
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
// this is FAR from an optimal recovery order. pretty lame, really.
- ceph_assert(!acting_recovery_backfill.empty());
+ ceph_assert(!get_acting_recovery_backfill().empty());
// choose replicas to recover, replica has the shortest missing list first
// so we can bring it back to normal ASAP
std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
async_by_num_missing;
- replicas_by_num_missing.reserve(acting_recovery_backfill.size() - 1);
- for (auto &p: acting_recovery_backfill) {
+ replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1);
+ for (auto &p: get_acting_recovery_backfill()) {
if (p == get_primary()) {
continue;
}
- auto pm = peer_missing.find(p);
- ceph_assert(pm != peer_missing.end());
+ auto pm = recovery_state.get_peer_missing().find(p);
+ ceph_assert(pm != recovery_state.get_peer_missing().end());
auto nm = pm->second.num_missing();
if (nm != 0) {
- if (async_recovery_targets.count(p)) {
+ if (is_async_recovery_target(p)) {
async_by_num_missing.push_back(make_pair(nm, p));
} else {
replicas_by_num_missing.push_back(make_pair(nm, p));
for (auto &replica: replicas_by_num_missing) {
pg_shard_t &peer = replica.second;
ceph_assert(peer != get_primary());
- map<pg_shard_t, pg_missing_t>::const_iterator pm = peer_missing.find(peer);
- ceph_assert(pm != peer_missing.end());
- map<pg_shard_t, pg_info_t>::const_iterator pi = peer_info.find(peer);
- ceph_assert(pi != peer_info.end());
+ auto pm = recovery_state.get_peer_missing().find(peer);
+ ceph_assert(pm != recovery_state.get_peer_missing().end());
size_t m_sz = pm->second.num_missing();
dout(10) << " peer osd." << peer << " missing " << m_sz << " objects." << dendl;
handle.reset_tp_timeout();
const hobject_t soid(p->second);
- if (missing_loc.is_unfound(soid)) {
+ if (recovery_state.get_missing_loc().is_unfound(soid)) {
dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
continue;
}
- if (soid > pi->second.last_backfill) {
+ const pg_info_t &pi = recovery_state.get_peer_info(peer);
+ if (soid > pi.last_backfill) {
if (!recovering.count(soid)) {
- derr << __func__ << ": object " << soid << " last_backfill " << pi->second.last_backfill << dendl;
+ derr << __func__ << ": object " << soid << " last_backfill "
+ << pi.last_backfill << dendl;
derr << __func__ << ": object added to missing set for backfill, but "
<< "is not in recovering, error!" << dendl;
ceph_abort();
continue;
}
- if (missing_loc.is_deleted(soid)) {
+ if (recovery_state.get_missing_loc().is_deleted(soid)) {
dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
started += prep_object_replica_deletes(soid, r->second.need, h, work_started);
continue;
}
- if (soid.is_snap() && pg_log.get_missing().is_missing(soid.get_head())) {
+ if (soid.is_snap() &&
+ recovery_state.get_pg_log().get_missing().is_missing(
+ soid.get_head())) {
dout(10) << __func__ << ": " << soid.get_head()
<< " still missing on primary" << dendl;
continue;
}
- if (pg_log.get_missing().is_missing(soid)) {
+ if (recovery_state.get_pg_log().get_missing().is_missing(soid)) {
dout(10) << __func__ << ": " << soid << " still missing on primary" << dendl;
continue;
}
hobject_t PrimaryLogPG::earliest_peer_backfill() const
{
hobject_t e = hobject_t::get_max();
- for (set<pg_shard_t>::const_iterator i = backfill_targets.begin();
- i != backfill_targets.end();
- ++i) {
- pg_shard_t peer = *i;
- map<pg_shard_t, BackfillInterval>::const_iterator iter =
- peer_backfill_info.find(peer);
+ for (const pg_shard_t& peer : get_backfill_targets()) {
+ const auto iter = peer_backfill_info.find(peer);
ceph_assert(iter != peer_backfill_info.end());
- if (iter->second.begin < e)
- e = iter->second.begin;
+ e = std::min(e, iter->second.begin);
}
return e;
}
// Primary hasn't got any more objects
ceph_assert(backfill_info.empty());
- for (set<pg_shard_t>::const_iterator i = backfill_targets.begin();
- i != backfill_targets.end();
- ++i) {
- pg_shard_t bt = *i;
- map<pg_shard_t, BackfillInterval>::const_iterator piter =
- peer_backfill_info.find(bt);
+ for (const pg_shard_t& bt : get_backfill_targets()) {
+ const auto piter = peer_backfill_info.find(bt);
ceph_assert(piter != peer_backfill_info.end());
const BackfillInterval& pbi = piter->second;
// See if peer has more to process
ThreadPool::TPHandle &handle, bool *work_started)
{
dout(10) << __func__ << " (" << max << ")"
- << " bft=" << backfill_targets
+ << " bft=" << get_backfill_targets()
<< " last_backfill_started " << last_backfill_started
<< (new_backfill ? " new_backfill":"")
<< dendl;
- ceph_assert(!backfill_targets.empty());
+ ceph_assert(!get_backfill_targets().empty());
// Initialize from prior backfill state
if (new_backfill) {
new_backfill = false;
// initialize BackfillIntervals
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
- peer_backfill_info[*i].reset(peer_info[*i].last_backfill);
+ peer_backfill_info[*i].reset(
+ recovery_state.get_peer_info(*i).last_backfill);
}
backfill_info.reset(last_backfill_started);
pending_backfill_updates.clear();
}
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
dout(10) << "peer osd." << *i
- << " info " << peer_info[*i]
+ << " info " << recovery_state.get_peer_info(*i)
<< " interval " << peer_backfill_info[*i].begin
<< "-" << peer_backfill_info[*i].end
<< " " << peer_backfill_info[*i].objects.size() << " objects"
vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
set<hobject_t> add_to_stat;
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
peer_backfill_info[*i].trim_to(
- std::max(peer_info[*i].last_backfill, last_backfill_started));
+ std::max(
+ recovery_state.get_peer_info(*i).last_backfill,
+ last_backfill_started));
}
backfill_info.trim_to(last_backfill_started);
dout(20) << " my backfill interval " << backfill_info << dendl;
bool sent_scan = false;
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
pg_shard_t bt = *i;
BackfillInterval& pbi = peer_backfill_info[bt];
dout(10) << " scanning peer osd." << bt << " from " << pbi.end << dendl;
epoch_t e = get_osdmap_epoch();
MOSDPGScan *m = new MOSDPGScan(
- MOSDPGScan::OP_SCAN_GET_DIGEST, pg_whoami, e, last_peering_reset,
+ MOSDPGScan::OP_SCAN_GET_DIGEST, pg_whoami, e, get_last_peering_reset(),
spg_t(info.pgid.pgid, bt.shard),
pbi.end, hobject_t());
osd->send_message_osd_cluster(bt.osd, m, get_osdmap_epoch());
if (check < backfill_info.begin) {
set<pg_shard_t> check_targets;
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
pg_shard_t bt = *i;
BackfillInterval& pbi = peer_backfill_info[bt];
eversion_t& obj_v = backfill_info.objects.begin()->second;
vector<pg_shard_t> need_ver_targs, missing_targs, keep_ver_targs, skip_targs;
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
pg_shard_t bt = *i;
BackfillInterval& pbi = peer_backfill_info[bt];
keep_ver_targs.push_back(bt);
}
} else {
- pg_info_t& pinfo = peer_info[bt];
+ const pg_info_t& pinfo = recovery_state.get_peer_info(bt);
// Only include peers that we've caught up to their backfill line
// otherwise, they only appear to be missing this object
}
dout(20) << "need_ver_targs=" << need_ver_targs
<< " keep_ver_targs=" << keep_ver_targs << dendl;
- dout(20) << "backfill_targets=" << backfill_targets
+ dout(20) << "backfill_targets=" << get_backfill_targets()
<< " missing_targs=" << missing_targs
<< " skip_targs=" << skip_targs << dendl;
pending_backfill_updates.erase(i++)) {
dout(20) << " pending_backfill_update " << i->first << dendl;
ceph_assert(i->first > new_last_backfill);
- for (set<pg_shard_t>::iterator j = backfill_targets.begin();
- j != backfill_targets.end();
- ++j) {
- pg_shard_t bt = *j;
- pg_info_t& pinfo = peer_info[bt];
- //Add stats to all peers that were missing object
- if (i->first > pinfo.last_backfill)
- pinfo.stats.add(i->second);
- }
+ recovery_state.update_complete_backfill_object_stats(
+ i->first,
+ i->second);
new_last_backfill = i->first;
}
dout(10) << "possible new_last_backfill at " << new_last_backfill << dendl;
// If new_last_backfill == MAX, then we will send OP_BACKFILL_FINISH to
// all the backfill targets. Otherwise, we will move last_backfill up on
// those targets need it and send OP_BACKFILL_PROGRESS to them.
- for (set<pg_shard_t>::iterator i = backfill_targets.begin();
- i != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ i != get_backfill_targets().end();
++i) {
pg_shard_t bt = *i;
- pg_info_t& pinfo = peer_info[bt];
+ const pg_info_t& pinfo = recovery_state.get_peer_info(bt);
if (new_last_backfill > pinfo.last_backfill) {
- pinfo.set_last_backfill(new_last_backfill);
+ recovery_state.update_peer_last_backfill(bt, new_last_backfill);
epoch_t e = get_osdmap_epoch();
MOSDPGBackfill *m = NULL;
if (pinfo.last_backfill.is_max()) {
m = new MOSDPGBackfill(
MOSDPGBackfill::OP_BACKFILL_FINISH,
e,
- last_peering_reset,
+ get_last_peering_reset(),
spg_t(info.pgid.pgid, bt.shard));
// Use default priority here, must match sub_op priority
- /* pinfo.stats might be wrong if we did log-based recovery on the
- * backfilled portion in addition to continuing backfill.
- */
- pinfo.stats = info.stats;
start_recovery_op(hobject_t::get_max());
} else {
m = new MOSDPGBackfill(
MOSDPGBackfill::OP_BACKFILL_PROGRESS,
e,
- last_peering_reset,
+ get_last_peering_reset(),
spg_t(info.pgid.pgid, bt.shard));
// Use default priority here, must match sub_op priority
}
ceph_assert(!peers.empty());
backfills_in_flight.insert(oid);
- for (unsigned int i = 0 ; i < peers.size(); ++i) {
- map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
- ceph_assert(bpm != peer_missing.end());
- bpm->second.add(oid, eversion_t(), eversion_t(), false);
- }
+ recovery_state.prepare_backfill_for_missing(oid, v, peers);
ceph_assert(!recovering.count(oid));
start_recovery_op(oid);
recovering.insert(make_pair(oid, obc));
- // We need to take the read_lock here in order to flush in-progress writes
int r = pgbackend->recover_object(
oid,
v,
h);
if (r < 0) {
dout(0) << __func__ << " Error " << r << " on oid " << oid << dendl;
- primary_failed(oid);
- primary_error(oid, v);
- backfills_in_flight.erase(oid);
- missing_loc.add_missing(oid, v, eversion_t());
+ on_failed_pull({ pg_whoami }, oid, v);
}
return r;
}
dout(10) << __func__<< ": bi is current " << dendl;
ceph_assert(bi->version == projected_last_update);
} else if (bi->version >= info.log_tail) {
- if (pg_log.get_log().empty() && projected_log.empty()) {
+ if (recovery_state.get_pg_log().get_log().empty() && projected_log.empty()) {
/* Because we don't move log_tail on split, the log might be
* empty even if log_tail != last_update. However, the only
* way to get here with an empty log is if log_tail is actually
}
};
dout(10) << "scanning pg log first" << dendl;
- pg_log.get_log().scan_log_after(bi->version, func);
+ recovery_state.get_pg_log().get_log().scan_log_after(bi->version, func);
dout(10) << "scanning projected log" << dendl;
projected_log.scan_log_after(bi->version, func);
bi->version = projected_last_update;
{
dout(10) << __func__ << dendl;
- ceph_assert(info.last_update >= pg_log.get_tail()); // otherwise we need some help!
+ ceph_assert(
+ info.last_update >=
+ recovery_state.get_pg_log().get_tail()); // otherwise we need some help!
if (!cct->_conf->osd_debug_verify_stray_on_activate)
return;
// just scan the log.
set<hobject_t> did;
- for (list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
- p != pg_log.get_log().log.rend();
+ for (list<pg_log_entry_t>::const_reverse_iterator p = recovery_state.get_pg_log().get_log().log.rbegin();
+ p != recovery_state.get_pg_log().get_log().log.rend();
++p) {
if (did.count(p->soid))
continue;
ostringstream ss;
ss << "hit_set_" << info.pgid.pgid << "_archive_";
if (using_gmt) {
- start.gmtime(ss) << "_";
- end.gmtime(ss);
+ start.gmtime(ss, true /* legacy pre-octopus form */) << "_";
+ end.gmtime(ss, true /* legacy pre-octopus form */);
} else {
- start.localtime(ss) << "_";
- end.localtime(ss);
+ start.localtime(ss, true /* legacy pre-octopus form */) << "_";
+ end.localtime(ss, true /* legacy pre-octopus form */);
}
hobject_t hoid(sobject_t(ss.str(), CEPH_NOSNAP), "",
info.pgid.ps(), info.pgid.pool(),
void PrimaryLogPG::hit_set_remove_all()
{
// If any archives are degraded we skip this
- for (list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
+ for (auto p = info.hit_set.history.begin();
p != info.hit_set.history.end();
++p) {
hobject_t aoid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt);
}
if (!info.hit_set.history.empty()) {
- list<pg_hit_set_info_t>::reverse_iterator p = info.hit_set.history.rbegin();
+ auto p = info.hit_set.history.rbegin();
ceph_assert(p != info.hit_set.history.rend());
hobject_t oid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt);
ceph_assert(!is_degraded_or_backfilling_object(oid));
simple_opc_submit(std::move(ctx));
}
- info.hit_set = pg_hit_set_history_t();
+ recovery_state.update_hset(pg_hit_set_history_t());
if (agent_state) {
agent_state->discard_hit_sets();
}
}
dout(20) << __func__ << " " << to << " .. " << info.last_update << dendl;
- list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
- while (p != pg_log.get_log().log.rend() && p->version > to)
+ list<pg_log_entry_t>::const_reverse_iterator p =
+ recovery_state.get_pg_log().get_log().log.rbegin();
+ while (p != recovery_state.get_pg_log().get_log().log.rend() && p->version > to)
++p;
- while (p != pg_log.get_log().log.rend() && p->version > from) {
+ while (p != recovery_state.get_pg_log().get_log().log.rend() && p->version > from) {
hit_set->insert(p->soid);
++p;
}
// If any archives are degraded we skip this persist request
// account for the additional entry being added below
- for (list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
+ for (auto p = info.hit_set.history.begin();
p != info.hit_set.history.end();
++p) {
hobject_t aoid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt);
// look just at that. This is necessary because our transactions
// may include a modify of the new hit_set *and* a delete of the
// old one, and this may span the backfill boundary.
- for (set<pg_shard_t>::iterator p = backfill_targets.begin();
- p != backfill_targets.end();
+ for (set<pg_shard_t>::const_iterator p = get_backfill_targets().begin();
+ p != get_backfill_targets().end();
++p) {
- ceph_assert(peer_info.count(*p));
- const pg_info_t& pi = peer_info[*p];
+ const pg_info_t& pi = recovery_state.get_peer_info(*p);
if (pi.last_backfill == hobject_t() ||
pi.last_backfill.get_hash() == info.pgid.ps()) {
dout(10) << __func__ << " backfill target osd." << *p
ctx->op_t->create(oid);
if (bl.length()) {
ctx->op_t->write(oid, 0, bl.length(), bl, 0);
+ write_update_size_and_usage(ctx->delta_stats, obc->obs.oi, ctx->modified_ranges,
+ 0, bl.length());
+ ctx->clean_regions.mark_data_region_dirty(0, bl.length());
}
map <string, bufferlist> attrs;
attrs[OI_ATTR].claim(boi);
ctx->mtime,
0)
);
+ ctx->log.back().clean_regions = ctx->clean_regions;
hit_set_trim(ctx, max);
// Return false if no objects operated on since start of object hash space
bool PrimaryLogPG::agent_work(int start_max, int agent_flush_quota)
{
- lock();
+ std::scoped_lock locker{*this};
if (!agent_state) {
dout(10) << __func__ << " no agent state, stopping" << dendl;
- unlock();
return true;
}
- ceph_assert(!deleting);
+ ceph_assert(!recovery_state.is_deleting());
if (agent_state->is_idle()) {
dout(10) << __func__ << " idle, stopping" << dendl;
- unlock();
return true;
}
if (need_delay) {
ceph_assert(agent_state->delaying == false);
agent_delay();
- unlock();
return false;
}
agent_choose_mode();
- unlock();
return true;
}
if (agent_state->hit_set_map.size() < info.hit_set.history.size()) {
dout(10) << __func__ << dendl;
- for (list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
+ for (auto p = info.hit_set.history.begin();
p != info.hit_set.history.end(); ++p) {
if (agent_state->hit_set_map.count(p->begin.sec()) == 0) {
dout(10) << __func__ << " loading " << p->begin << "-"
auto null_op_req = OpRequestRef();
if (!ctx->lock_manager.get_lock_type(
- ObjectContext::RWState::RWWRITE,
+ RWState::RWWRITE,
obc->obs.oi.soid,
obc,
null_op_req)) {
void PrimaryLogPG::agent_choose_mode_restart()
{
dout(20) << __func__ << dendl;
- lock();
+ std::scoped_lock locker{*this};
if (agent_state && agent_state->delaying) {
agent_state->delaying = false;
agent_choose_mode(true);
}
- unlock();
}
bool PrimaryLogPG::agent_choose_mode(bool restart, OpRequestRef op)
<< " -> "
<< TierAgentState::get_flush_mode_name(flush_mode)
<< dendl;
- if (flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
- osd->agent_inc_high_count();
- info.stats.stats.sum.num_flush_mode_high = 1;
- } else if (flush_mode == TierAgentState::FLUSH_MODE_LOW) {
- info.stats.stats.sum.num_flush_mode_low = 1;
- }
- if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
- osd->agent_dec_high_count();
- info.stats.stats.sum.num_flush_mode_high = 0;
- } else if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_LOW) {
- info.stats.stats.sum.num_flush_mode_low = 0;
- }
+ recovery_state.update_stats(
+ [=](auto &history, auto &stats) {
+ if (flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
+ osd->agent_inc_high_count();
+ stats.stats.sum.num_flush_mode_high = 1;
+ } else if (flush_mode == TierAgentState::FLUSH_MODE_LOW) {
+ stats.stats.sum.num_flush_mode_low = 1;
+ }
+ if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_HIGH) {
+ osd->agent_dec_high_count();
+ stats.stats.sum.num_flush_mode_high = 0;
+ } else if (agent_state->flush_mode == TierAgentState::FLUSH_MODE_LOW) {
+ stats.stats.sum.num_flush_mode_low = 0;
+ }
+ return false;
+ });
agent_state->flush_mode = flush_mode;
}
if (evict_mode != agent_state->evict_mode) {
requeue_op(op);
requeue_ops(waiting_for_flush);
requeue_ops(waiting_for_active);
+ requeue_ops(waiting_for_readable);
requeue_ops(waiting_for_scrub);
requeue_ops(waiting_for_cache_not_full);
objects_blocked_on_cache_full.clear();
requeued = true;
}
- if (evict_mode == TierAgentState::EVICT_MODE_SOME) {
- info.stats.stats.sum.num_evict_mode_some = 1;
- } else if (evict_mode == TierAgentState::EVICT_MODE_FULL) {
- info.stats.stats.sum.num_evict_mode_full = 1;
- }
- if (agent_state->evict_mode == TierAgentState::EVICT_MODE_SOME) {
- info.stats.stats.sum.num_evict_mode_some = 0;
- } else if (agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
- info.stats.stats.sum.num_evict_mode_full = 0;
- }
+ recovery_state.update_stats(
+ [=](auto &history, auto &stats) {
+ if (evict_mode == TierAgentState::EVICT_MODE_SOME) {
+ stats.stats.sum.num_evict_mode_some = 1;
+ } else if (evict_mode == TierAgentState::EVICT_MODE_FULL) {
+ stats.stats.sum.num_evict_mode_full = 1;
+ }
+ if (agent_state->evict_mode == TierAgentState::EVICT_MODE_SOME) {
+ stats.stats.sum.num_evict_mode_some = 0;
+ } else if (agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
+ stats.stats.sum.num_evict_mode_full = 0;
+ }
+ return false;
+ });
agent_state->evict_mode = evict_mode;
}
uint64_t old_effort = agent_state->evict_effort;
return true;
}
-bool PrimaryLogPG::already_ack(eversion_t v)
-{
- dout(20) << __func__ << ": " << v << dendl;
- for (xlist<RepGather*>::iterator i = repop_queue.begin();
- !i.end();
- ++i) {
- // skip copy from temp object ops
- if ((*i)->v == eversion_t()) {
- dout(20) << __func__ << ": " << **i
- << " version is empty" << dendl;
- continue;
- }
- if ((*i)->v > v) {
- dout(20) << __func__ << ": " << **i
- << " (*i)->v past v" << dendl;
- break;
- }
- }
- dout(20) << __func__ << ": returning true" << dendl;
- return true;
-}
-
// ==========================================================================================
// SCRUB
return true;
}
-static bool doing_clones(const boost::optional<SnapSet> &snapset,
+static bool doing_clones(const std::optional<SnapSet> &snapset,
const vector<snapid_t>::reverse_iterator &curclone) {
- return snapset && curclone != snapset.get().clones.rend();
+ return snapset && curclone != snapset->clones.rend();
}
void PrimaryLogPG::log_missing(unsigned missing,
- const boost::optional<hobject_t> &head,
+ const std::optional<hobject_t> &head,
LogChannelRef clog,
const spg_t &pgid,
const char *func,
{
ceph_assert(head);
if (allow_incomplete_clones) {
- dout(20) << func << " " << mode << " " << pgid << " " << head.get()
- << " skipped " << missing << " clone(s) in cache tier" << dendl;
+ dout(20) << func << " " << mode << " " << pgid << " " << *head
+ << " skipped " << missing << " clone(s) in cache tier" << dendl;
} else {
- clog->info() << mode << " " << pgid << " " << head.get()
- << " : " << missing << " missing clone(s)";
+ clog->info() << mode << " " << pgid << " " << *head
+ << " : " << missing << " missing clone(s)";
}
}
-unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
- const boost::optional<SnapSet> &snapset,
+unsigned PrimaryLogPG::process_clones_to(const std::optional<hobject_t> &head,
+ const std::optional<SnapSet> &snapset,
LogChannelRef clog,
const spg_t &pgid,
const char *mode,
bool allow_incomplete_clones,
- boost::optional<snapid_t> target,
+ std::optional<snapid_t> target,
vector<snapid_t>::reverse_iterator *curclone,
inconsistent_snapset_wrapper &e)
{
unsigned missing = 0;
// NOTE: clones are in descending order, thus **curclone > target test here
- hobject_t next_clone(head.get());
+ hobject_t next_clone(*head);
while(doing_clones(snapset, *curclone) && (!target || **curclone > *target)) {
++missing;
// it is okay to be missing one or more clones in a cache tier.
// skip higher-numbered clones in the list.
if (!allow_incomplete_clones) {
next_clone.snap = **curclone;
- clog->error() << mode << " " << pgid << " " << head.get()
+ clog->error() << mode << " " << pgid << " " << *head
<< " : expected clone " << next_clone << " " << missing
<< " missing";
++scrubber.shallow_errors;
void PrimaryLogPG::scrub_snapshot_metadata(
ScrubMap &scrubmap,
const map<hobject_t,
- pair<boost::optional<uint32_t>,
- boost::optional<uint32_t>>> &missing_digest)
+ pair<std::optional<uint32_t>,
+ std::optional<uint32_t>>> &missing_digest)
{
dout(10) << __func__ << dendl;
bool repair = state_test(PG_STATE_REPAIR);
bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
- boost::optional<snapid_t> all_clones; // Unspecified snapid_t or boost::none
+ std::optional<snapid_t> all_clones; // Unspecified snapid_t or std::nullopt
// traverse in reverse order.
- boost::optional<hobject_t> head;
- boost::optional<SnapSet> snapset; // If initialized so will head (above)
+ std::optional<hobject_t> head;
+ std::optional<SnapSet> snapset; // If initialized so will head (above)
vector<snapid_t>::reverse_iterator curclone; // Defined only if snapset initialized
unsigned missing = 0;
inconsistent_snapset_wrapper soid_error, head_error;
ceph_assert(!soid.is_snapdir());
soid_error = inconsistent_snapset_wrapper{soid};
object_stat_sum_t stat;
- boost::optional<object_info_t> oi;
+ std::optional<object_info_t> oi;
stat.num_objects++;
// basic checks.
if (p->second.attrs.count(OI_ATTR) == 0) {
- oi = boost::none;
+ oi = std::nullopt;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " : no '" << OI_ATTR << "' attr";
++scrubber.shallow_errors;
bv.push_back(p->second.attrs[OI_ATTR]);
try {
oi = object_info_t(); // Initialize optional<> before decode into it
- oi.get().decode(bv);
+ oi->decode(bv);
} catch (buffer::error& e) {
- oi = boost::none;
+ oi = std::nullopt;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " : can't decode '" << OI_ATTR << "' attr " << e.what();
++scrubber.shallow_errors;
++scrubber.shallow_errors;
}
- dout(20) << mode << " " << soid << " " << oi.get() << dendl;
+ dout(20) << mode << " " << soid << " " << *oi << dendl;
// A clone num_bytes will be added later when we have snapset
if (!soid.is_snap()) {
// Check for any problems while processing clones
if (doing_clones(snapset, curclone)) {
- boost::optional<snapid_t> target;
+ std::optional<snapid_t> target;
// Expecting an object with snap for current head
if (soid.has_snapset() || soid.get_head() != head->get_head()) {
dout(10) << __func__ << " " << mode << " " << info.pgid << " new object "
- << soid << " while processing " << head.get() << dendl;
+ << soid << " while processing " << *head << dendl;
target = all_clones;
} else {
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " : no '" << SS_ATTR << "' attr";
++scrubber.shallow_errors;
- snapset = boost::none;
+ snapset = std::nullopt;
head_error.set_snapset_missing();
} else {
bufferlist bl;
auto blp = bl.cbegin();
try {
snapset = SnapSet(); // Initialize optional<> before decoding into it
- decode(snapset.get(), blp);
+ decode(*snapset, blp);
head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]);
} catch (buffer::error& e) {
- snapset = boost::none;
+ snapset = std::nullopt;
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " : can't decode '" << SS_ATTR << "' attr " << e.what();
++scrubber.shallow_errors;
curclone = snapset->clones.rbegin();
if (!snapset->clones.empty()) {
- dout(20) << " snapset " << snapset.get() << dendl;
+ dout(20) << " snapset " << *snapset << dendl;
if (snapset->seq == 0) {
osd->clog->error() << mode << " " << info.pgid << " " << soid
<< " : snaps.seq not set";
if (doing_clones(snapset, curclone)) {
dout(10) << __func__ << " " << mode << " " << info.pgid
- << " No more objects while processing " << head.get() << dendl;
+ << " No more objects while processing " << *head << dendl;
missing += process_clones_to(head, snapset, osd->clog, info.pgid, mode,
pool.info.allow_incomplete_clones(), all_clones, &curclone,
const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
if (info.stats.stats_invalid) {
- info.stats.stats = scrub_cstat;
- info.stats.stats_invalid = false;
+ recovery_state.update_stats(
+ [=](auto &history, auto &stats) {
+ stats.stats = scrub_cstat;
+ stats.stats_invalid = false;
+ return false;
+ });
if (agent_state)
agent_choose_mode();
if (repair) {
++scrubber.fixed;
- info.stats.stats = scrub_cstat;
- info.stats.dirty_stats_invalid = false;
- info.stats.omap_stats_invalid = false;
- info.stats.hitset_stats_invalid = false;
- info.stats.hitset_bytes_stats_invalid = false;
- info.stats.pin_stats_invalid = false;
- info.stats.manifest_stats_invalid = false;
+ recovery_state.update_stats(
+ [this](auto &history, auto &stats) {
+ stats.stats = scrub_cstat;
+ stats.dirty_stats_invalid = false;
+ stats.omap_stats_invalid = false;
+ stats.hitset_stats_invalid = false;
+ stats.hitset_bytes_stats_invalid = false;
+ stats.pin_stats_invalid = false;
+ stats.manifest_stats_invalid = false;
+ return false;
+ });
publish_stats_to_osd();
- share_pg_info();
+ recovery_state.share_pg_info();
}
}
// Clear object context cache to get repair information
object_contexts.clear();
}
-bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
-{
- return osd->check_osdmap_full(missing_on);
-}
-
int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpContext *ctx)
{
OpRequestRef op = ctx->op;
ceph_assert(is_primary());
dout(10) << __func__ << " " << soid
- << " peers osd.{" << acting_recovery_backfill << "}" << dendl;
+ << " peers osd.{" << get_acting_recovery_backfill() << "}" << dendl;
if (!is_clean()) {
block_for_clean(soid, op);
return -EAGAIN;
}
- ceph_assert(!pg_log.get_missing().is_missing(soid));
+ ceph_assert(!recovery_state.get_pg_log().get_missing().is_missing(soid));
auto& oi = ctx->new_obs.oi;
eversion_t v = oi.version;
- missing_loc.add_missing(soid, v, eversion_t());
if (primary_error(soid, v)) {
dout(0) << __func__ << " No other replicas available for " << soid << dendl;
// XXX: If we knew that there is no down osd which could include this
std::make_shared<PGPeeringEvent>(
get_osdmap_epoch(),
get_osdmap_epoch(),
- DoRecovery())));
+ PeeringState::DoRecovery())));
} else {
// A prior error must have already cleared clean state and queued recovery
// or a map change has triggered re-peering.
/* NotTrimming */
PrimaryLogPG::NotTrimming::NotTrimming(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "NotTrimming")
+ NamedState(nullptr, "NotTrimming")
{
context< SnapTrimmer >().log_enter(state_name);
}
/* AwaitAsyncWork */
PrimaryLogPG::AwaitAsyncWork::AwaitAsyncWork(my_context ctx)
: my_base(ctx),
- NamedState(context< SnapTrimmer >().pg, "Trimming/AwaitAsyncWork")
+ NamedState(nullptr, "Trimming/AwaitAsyncWork")
{
auto *pg = context< SnapTrimmer >().pg;
context< SnapTrimmer >().log_enter(state_name);
// Done!
ldout(pg->cct, 10) << "got ENOENT" << dendl;
- ldout(pg->cct, 10) << "adding snap " << snap_to_trim
- << " to purged_snaps"
- << dendl;
- pg->info.purged_snaps.insert(snap_to_trim);
pg->snap_trimq.erase(snap_to_trim);
- ldout(pg->cct, 10) << "purged_snaps now "
- << pg->info.purged_snaps << ", snap_trimq now "
- << pg->snap_trimq << dendl;
- ObjectStore::Transaction t;
- pg->dirty_big_info = true;
- pg->write_if_dirty(t);
- int tr = pg->osd->store->queue_transaction(pg->ch, std::move(t), NULL);
- ceph_assert(tr == 0);
+ if (pg->snap_trimq_repeat.count(snap_to_trim)) {
+ ldout(pg->cct, 10) << " removing from snap_trimq_repeat" << dendl;
+ pg->snap_trimq_repeat.erase(snap_to_trim);
+ } else {
+ ldout(pg->cct, 10) << "adding snap " << snap_to_trim
+ << " to purged_snaps"
+ << dendl;
+ ObjectStore::Transaction t;
+ pg->recovery_state.adjust_purged_snaps(
+ [snap_to_trim](auto &purged_snaps) {
+ purged_snaps.insert(snap_to_trim);
+ });
+ pg->write_if_dirty(t);
+
+ ldout(pg->cct, 10) << "purged_snaps now "
+ << pg->info.purged_snaps << ", snap_trimq now "
+ << pg->snap_trimq << dendl;
+
+ int tr = pg->osd->store->queue_transaction(pg->ch, std::move(t), NULL);
+ ceph_assert(tr == 0);
- pg->share_pg_info();
+ pg->recovery_state.share_pg_info();
+ }
post_event(KickTrim());
return transit< NotTrimming >();
}
// Get next
ldout(pg->cct, 10) << "AwaitAsyncWork react trimming " << object << dendl;
OpContextUPtr ctx;
- int error = pg->trim_object(in_flight.empty(), object, &ctx);
+ int error = pg->trim_object(in_flight.empty(), object, snap_to_trim, &ctx);
if (error) {
if (error == -ENOLCK) {
ldout(pg->cct, 10) << "could not get write lock on obj "