#pragma once
// re-include our assert to clobber boost's
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "osd_types.h"
#include "os/ObjectStore.h"
#include <list>
-using namespace std;
-#define PGLOG_INDEXED_OBJECTS (1 << 0)
-#define PGLOG_INDEXED_CALLER_OPS (1 << 1)
-#define PGLOG_INDEXED_EXTRA_CALLER_OPS (1 << 2)
-#define PGLOG_INDEXED_DUPS (1 << 3)
-#define PGLOG_INDEXED_ALL (PGLOG_INDEXED_OBJECTS | \
- PGLOG_INDEXED_CALLER_OPS | \
- PGLOG_INDEXED_EXTRA_CALLER_OPS | \
- PGLOG_INDEXED_DUPS)
+constexpr auto PGLOG_INDEXED_OBJECTS = 1 << 0;
+constexpr auto PGLOG_INDEXED_CALLER_OPS = 1 << 1;
+constexpr auto PGLOG_INDEXED_EXTRA_CALLER_OPS = 1 << 2;
+constexpr auto PGLOG_INDEXED_DUPS = 1 << 3;
+constexpr auto PGLOG_INDEXED_ALL = PGLOG_INDEXED_OBJECTS
+ | PGLOG_INDEXED_CALLER_OPS
+ | PGLOG_INDEXED_EXTRA_CALLER_OPS
+ | PGLOG_INDEXED_DUPS;
class CephContext;
struct PGLog : DoutPrefixProvider {
- DoutPrefixProvider *prefix_provider;
- string gen_prefix() const override {
- return prefix_provider ? prefix_provider->gen_prefix() : "";
+ std::ostream& gen_prefix(std::ostream& out) const override {
+ return out;
}
unsigned get_subsys() const override {
- return prefix_provider ? prefix_provider->get_subsys() :
- (unsigned)ceph_subsys_osd;
+ return static_cast<unsigned>(ceph_subsys_osd);
}
CephContext *get_cct() const override {
return cct;
virtual ~LogEntryHandler() {}
};
- /* Exceptions */
- class read_log_and_missing_error : public buffer::error {
- public:
- explicit read_log_and_missing_error(const char *what) {
- snprintf(buf, sizeof(buf), "read_log_and_missing_error: %s", what);
- }
- const char *what() const throw () override {
- return buf;
- }
- private:
- char buf[512];
- };
-
public:
/**
* IndexLog - adds in-memory index of the log, by oid.
{ }
template <typename... Args>
- IndexedLog(Args&&... args) :
+ explicit IndexedLog(Args&&... args) :
pg_log_t(std::forward<Args>(args)...),
complete_to(log.end()),
last_requested(0),
/****/
void claim_log_and_clear_rollback_info(const pg_log_t& o) {
// we must have already trimmed the old entries
- assert(rollback_info_trimmed_to == head);
- assert(rollback_info_trimmed_to_riter == log.rbegin());
+ ceph_assert(rollback_info_trimmed_to == head);
+ ceph_assert(rollback_info_trimmed_to_riter == log.rbegin());
*this = IndexedLog(o);
void zero() {
// we must have already trimmed the old entries
- assert(rollback_info_trimmed_to == head);
- assert(rollback_info_trimmed_to_riter == log.rbegin());
+ ceph_assert(rollback_info_trimmed_to == head);
+ ceph_assert(rollback_info_trimmed_to_riter == log.rbegin());
unindex();
pg_log_t::clear();
version_t *user_version,
int *return_code) const
{
- assert(version);
- assert(user_version);
- assert(return_code);
+ ceph_assert(version);
+ ceph_assert(user_version);
+ ceph_assert(return_code);
ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
if (!(indexed_data & PGLOG_INDEXED_CALLER_OPS)) {
index_caller_ops();
}
p = extra_caller_ops.find(r);
if (p != extra_caller_ops.end()) {
+ uint32_t idx = 0;
for (auto i = p->second->extra_reqids.begin();
i != p->second->extra_reqids.end();
- ++i) {
+ ++idx, ++i) {
if (i->first == r) {
*version = p->second->version;
*user_version = i->second;
*return_code = p->second->return_code;
+ if (*return_code >= 0) {
+ auto it = p->second->extra_reqid_return_codes.find(idx);
+ if (it != p->second->extra_reqid_return_codes.end()) {
+ *return_code = it->second;
+ }
+ }
return true;
}
}
- assert(0 == "in extra_caller_ops but not extra_reqids");
+ ceph_abort_msg("in extra_caller_ops but not extra_reqids");
}
if (!(indexed_data & PGLOG_INDEXED_DUPS)) {
/// get a (bounded) list of recent reqids for the given object
void get_object_reqids(const hobject_t& oid, unsigned max,
- mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls) const {
+ mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls,
+ mempool::osd_pglog::map<uint32_t, int> *return_codes) const {
// make sure object is present at least once before we do an
// O(n) search.
if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
}
if (objects.count(oid) == 0)
return;
+
for (list<pg_log_entry_t>::const_reverse_iterator i = log.rbegin();
i != log.rend();
++i) {
if (i->soid == oid) {
- if (i->reqid_is_indexed())
+ if (i->reqid_is_indexed()) {
+ if (i->op == pg_log_entry_t::ERROR) {
+ // propagate op errors to the cache tier's PG log
+ return_codes->emplace(pls->size(), i->return_code);
+ }
pls->push_back(make_pair(i->reqid, i->user_version));
+ }
+
pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end());
if (pls->size() >= max) {
if (pls->size() > max) {
void unindex(const pg_log_entry_t& e) {
// NOTE: this only works if we remove from the _tail_ of the log!
if (indexed_data & PGLOG_INDEXED_OBJECTS) {
- if (objects.count(e.soid) && objects[e.soid]->version == e.version)
- objects.erase(e.soid);
+ auto it = objects.find(e.soid);
+ if (it != objects.end() && it->second->version == e.version)
+ objects.erase(it);
}
if (e.reqid_is_indexed()) {
if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
+ auto it = caller_ops.find(e.reqid);
// divergent merge_log indexes new before unindexing old
- if (caller_ops.count(e.reqid) && caller_ops[e.reqid] == &e)
- caller_ops.erase(e.reqid);
+ if (it != caller_ops.end() && it->second == &e)
+ caller_ops.erase(it);
}
}
if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
// actors
void add(const pg_log_entry_t& e, bool applied = true) {
if (!applied) {
- assert(get_can_rollback_to() == head);
+ ceph_assert(get_can_rollback_to() == head);
}
// make sure our buffers don't pin bigger buffers
if (rollback_info_trimmed_to_riter == log.rbegin())
++rollback_info_trimmed_to_riter;
- assert(e.version > head);
- assert(head.version == 0 || e.version.version > head.version);
+ ceph_assert(e.version > head);
+ ceph_assert(head.version == 0 || e.version.version > head.version);
head = e.version;
// to our index
public:
// cppcheck-suppress noExplicitConstructor
- PGLog(CephContext *cct, DoutPrefixProvider *dpp = nullptr) :
- prefix_provider(dpp),
+ PGLog(CephContext *cct) :
dirty_from(eversion_t::max()),
writeout_from(eversion_t::max()),
dirty_from_dups(eversion_t::max()),
//////////////////// get or set missing ////////////////////
const pg_missing_tracker_t& get_missing() const { return missing; }
- void revise_have(hobject_t oid, eversion_t have) {
- missing.revise_have(oid, have);
+
+ void missing_add(const hobject_t& oid, eversion_t need, eversion_t have, bool is_delete=false) {
+ missing.add(oid, need, have, is_delete);
}
- void missing_add(const hobject_t& oid, eversion_t need, eversion_t have) {
- missing.add(oid, need, have, false);
+ void missing_add_next_entry(const pg_log_entry_t& e) {
+ missing.add_next_event(e);
}
//////////////////// get or set log ////////////////////
void trim(
eversion_t trim_to,
pg_info_t &info,
- bool transaction_applied = true);
+ bool transaction_applied = true,
+ bool async = false);
void roll_forward_to(
eversion_t roll_forward_to,
opg_log->rebuilt_missing_with_deletes = true;
}
+ void merge_from(
+ const vector<PGLog*>& sources,
+ eversion_t last_update) {
+ unindex();
+ missing.clear();
+
+ vector<pg_log_t*> slogs;
+ for (auto s : sources) {
+ slogs.push_back(&s->log);
+ }
+ log.merge_from(slogs, last_update);
+
+ index();
+
+ mark_log_for_rewrite();
+ }
+
void recover_got(hobject_t oid, eversion_t v, pg_info_t &info) {
if (missing.is_missing(oid, v)) {
missing.got(oid, v);
+ info.stats.stats.sum.num_objects_missing = missing.num_missing();
// raise last_complete?
if (missing.get_items().empty()) {
log.complete_to = log.log.end();
info.last_complete = info.last_update;
}
+ auto oldest_need = missing.get_oldest_need();
while (log.complete_to != log.log.end()) {
- if (missing.get_items().at(
- missing.get_rmissing().begin()->second
- ).need <= log.complete_to->version)
+ if (oldest_need <= log.complete_to->version)
break;
if (info.last_complete < log.complete_to->version)
info.last_complete = log.complete_to->version;
}
}
- assert(log.get_can_rollback_to() >= v);
+ ceph_assert(log.get_can_rollback_to() >= v);
}
void reset_complete_to(pg_info_t *info) {
log.complete_to = log.log.begin();
- while (!missing.get_items().empty() && log.complete_to->version <
- missing.get_items().at(
- missing.get_rmissing().begin()->second
- ).need) {
- assert(log.complete_to != log.log.end());
- ++log.complete_to;
+ ceph_assert(log.complete_to != log.log.end());
+ auto oldest_need = missing.get_oldest_need();
+ if (oldest_need != eversion_t()) {
+ while (log.complete_to->version < oldest_need) {
+ ++log.complete_to;
+ ceph_assert(log.complete_to != log.log.end());
+ }
}
- assert(log.complete_to != log.log.end());
+ if (!info)
+ return;
if (log.complete_to == log.log.begin()) {
- if (info)
- info->last_complete = eversion_t();
+ info->last_complete = eversion_t();
} else {
--log.complete_to;
- if (info)
- info->last_complete = log.complete_to->version;
+ info->last_complete = log.complete_to->version;
++log.complete_to;
}
}
pg_missing_t& omissing, pg_shard_t from) const;
void rebuild_missing_set_with_deletes(ObjectStore *store,
- coll_t pg_coll,
+ ObjectStore::CollectionHandle& ch,
const pg_info_t &info);
protected:
}
// entries is non-empty
- assert(!orig_entries.empty());
+ ceph_assert(!orig_entries.empty());
// strip out and ignore ERROR entries
mempool::osd_pglog::list<pg_log_entry_t> entries;
eversion_t last;
i != orig_entries.end();
++i) {
// all entries are on hoid
- assert(i->soid == hoid);
+ ceph_assert(i->soid == hoid);
// did not see error entries before this entry and this entry is not error
// then this entry is the first non error entry
bool first_non_error = ! seen_non_error && ! i->is_error();
if (i != orig_entries.begin() && i->prior_version != eversion_t() &&
! first_non_error) {
// in increasing order of version
- assert(i->version > last);
+ ceph_assert(i->version > last);
// prior_version correct (unless it is an ERROR entry)
- assert(i->prior_version == last || i->is_error());
+ ceph_assert(i->prior_version == last || i->is_error());
}
if (i->is_error()) {
ldpp_dout(dpp, 20) << __func__ << ": ignoring " << *i << dendl;
ldpp_dout(dpp, 10) << __func__ << ": more recent entry found: "
<< *objiter->second << ", already merged" << dendl;
- assert(objiter->second->version > last_divergent_update);
+ ceph_assert(objiter->second->version > last_divergent_update);
// ensure missing has been updated appropriately
if (objiter->second->is_update() ||
(missing.may_include_deletes && objiter->second->is_delete())) {
- assert(missing.is_missing(hoid) &&
+ ceph_assert(missing.is_missing(hoid) &&
missing.get_items().at(hoid).need == objiter->second->version);
} else {
- assert(!missing.is_missing(hoid));
+ ceph_assert(!missing.is_missing(hoid));
}
missing.revise_have(hoid, eversion_t());
if (rollbacker) {
<< " attempting to rollback"
<< dendl;
bool can_rollback = true;
+ // We are going to make an important decision based on the
+ // olog_can_rollback_to value we have received, better known it.
+ ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
+ << " olog_can_rollback_to: "
+ << olog_can_rollback_to << dendl;
/// Distinguish between 4) and 5)
for (list<pg_log_entry_t>::const_reverse_iterator i = entries.rbegin();
i != entries.rend();
for (list<pg_log_entry_t>::const_reverse_iterator i = entries.rbegin();
i != entries.rend();
++i) {
- assert(i->can_rollback() && i->version > olog_can_rollback_to);
+ ceph_assert(i->can_rollback() && i->version > olog_can_rollback_to);
ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
<< " rolling back " << *i << dendl;
if (rollbacker)
const DoutPrefixProvider *dpp) {
bool invalidate_stats = false;
if (log && !entries.empty()) {
- assert(log->head < entries.begin()->version);
+ ceph_assert(log->head < entries.begin()->version);
}
for (list<pg_log_entry_t>::const_iterator p = entries.begin();
p != entries.end();
eversion_t dirty_to,
eversion_t dirty_from,
eversion_t writeout_from,
- const set<eversion_t> &trimmed,
- const set<string> &trimmed_dups,
bool dirty_divergent_priors,
bool touch_log,
bool require_rollback,
eversion_t dirty_to,
eversion_t dirty_from,
eversion_t writeout_from,
- const set<eversion_t> &trimmed,
- const set<string> &trimmed_dups,
+ set<eversion_t> &&trimmed,
+ set<string> &&trimmed_dups,
const pg_missing_tracker_t &missing,
bool touch_log,
bool require_rollback,
void read_log_and_missing(
ObjectStore *store,
- coll_t pg_coll,
- coll_t log_coll,
- ghobject_t log_oid,
+ ObjectStore::CollectionHandle& ch,
+ ghobject_t pgmeta_oid,
const pg_info_t &info,
- bool force_rebuild_missing,
ostringstream &oss,
bool tolerate_divergent_missing_log,
bool debug_verify_stored_missing = false
) {
return read_log_and_missing(
- store, pg_coll, log_coll, log_oid, info,
- log, missing, force_rebuild_missing, oss,
+ store, ch, pgmeta_oid, info,
+ log, missing, oss,
tolerate_divergent_missing_log,
&clear_divergent_priors,
this,
template <typename missing_type>
static void read_log_and_missing(
ObjectStore *store,
- coll_t pg_coll,
- coll_t log_coll,
- ghobject_t log_oid,
+ ObjectStore::CollectionHandle &ch,
+ ghobject_t pgmeta_oid,
const pg_info_t &info,
IndexedLog &log,
missing_type &missing,
- bool force_rebuild_missing,
ostringstream &oss,
bool tolerate_divergent_missing_log,
bool *clear_divergent_priors = nullptr,
set<string> *log_keys_debug = nullptr,
bool debug_verify_stored_missing = false
) {
- ldpp_dout(dpp, 20) << "read_log_and_missing coll " << pg_coll
- << " log_oid " << log_oid << dendl;
+ ldpp_dout(dpp, 20) << "read_log_and_missing coll " << ch->cid
+ << " " << pgmeta_oid << dendl;
// legacy?
struct stat st;
- int r = store->stat(log_coll, log_oid, &st);
- assert(r == 0);
- assert(st.st_size == 0);
+ int r = store->stat(ch, pgmeta_oid, &st);
+ ceph_assert(r == 0);
+ ceph_assert(st.st_size == 0);
// will get overridden below if it had been recorded
eversion_t on_disk_can_rollback_to = info.last_update;
eversion_t on_disk_rollback_info_trimmed_to = eversion_t();
- ObjectMap::ObjectMapIterator p = store->get_omap_iterator(log_coll, log_oid);
+ ObjectMap::ObjectMapIterator p = store->get_omap_iterator(ch,
+ pgmeta_oid);
map<eversion_t, hobject_t> divergent_priors;
- bool must_rebuild = force_rebuild_missing;
+ bool must_rebuild = false;
missing.may_include_deletes = false;
list<pg_log_entry_t> entries;
list<pg_log_dup_t> dups;
if (p) {
- for (p->seek_to_first(); p->valid() ; p->next(false)) {
+ for (p->seek_to_first(); p->valid() ; p->next()) {
// non-log pgmeta_oid keys are prefixed with _; skip those
if (p->key()[0] == '_')
continue;
bufferlist bl = p->value();//Copy bufferlist before creating iterator
- bufferlist::iterator bp = bl.begin();
+ auto bp = bl.cbegin();
if (p->key() == "divergent_priors") {
- ::decode(divergent_priors, bp);
+ decode(divergent_priors, bp);
ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size()
<< " divergent_priors" << dendl;
must_rebuild = true;
debug_verify_stored_missing = false;
} else if (p->key() == "can_rollback_to") {
- ::decode(on_disk_can_rollback_to, bp);
+ decode(on_disk_can_rollback_to, bp);
} else if (p->key() == "rollback_info_trimmed_to") {
- ::decode(on_disk_rollback_info_trimmed_to, bp);
+ decode(on_disk_rollback_info_trimmed_to, bp);
} else if (p->key() == "may_include_deletes_in_missing") {
missing.may_include_deletes = true;
} else if (p->key().substr(0, 7) == string("missing")) {
hobject_t oid;
pg_missing_item item;
- ::decode(oid, bp);
- ::decode(item, bp);
+ decode(oid, bp);
+ decode(item, bp);
if (item.is_delete()) {
- assert(missing.may_include_deletes);
+ ceph_assert(missing.may_include_deletes);
}
missing.add(oid, item.need, item.have, item.is_delete());
} else if (p->key().substr(0, 4) == string("dup_")) {
pg_log_dup_t dup;
- ::decode(dup, bp);
+ decode(dup, bp);
if (!dups.empty()) {
- assert(dups.back().version < dup.version);
+ ceph_assert(dups.back().version < dup.version);
}
dups.push_back(dup);
} else {
ldpp_dout(dpp, 20) << "read_log_and_missing " << e << dendl;
if (!entries.empty()) {
pg_log_entry_t last_e(entries.back());
- assert(last_e.version.version < e.version.version);
- assert(last_e.version.epoch <= e.version.epoch);
+ ceph_assert(last_e.version.version < e.version.version);
+ ceph_assert(last_e.version.epoch <= e.version.epoch);
}
entries.push_back(e);
if (log_keys_debug)
bufferlist bv;
int r = store->getattr(
- pg_coll,
+ ch,
ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard),
OI_ATTR,
bv);
<< " (have " << oi.version << ")" << dendl;
if (debug_verify_stored_missing) {
auto miter = missing.get_items().find(i->soid);
- assert(miter != missing.get_items().end());
- assert(miter->second.need == i->version);
+ ceph_assert(miter != missing.get_items().end());
+ ceph_assert(miter->second.need == i->version);
// the 'have' version is reset if an object is deleted,
// then created again
- assert(miter->second.have == oi.version || miter->second.have == eversion_t());
+ ceph_assert(miter->second.have == oi.version || miter->second.have == eversion_t());
checked.insert(i->soid);
} else {
missing.add(i->soid, i->version, oi.version, i->is_delete());
if (debug_verify_stored_missing) {
auto miter = missing.get_items().find(i->soid);
if (i->is_delete()) {
- assert(miter == missing.get_items().end() ||
+ ceph_assert(miter == missing.get_items().end() ||
(miter->second.need == i->version &&
miter->second.have == eversion_t()));
} else {
- assert(miter != missing.get_items().end());
- assert(miter->second.need == i->version);
- assert(miter->second.have == eversion_t());
+ ceph_assert(miter != missing.get_items().end());
+ ceph_assert(miter->second.need == i->version);
+ ceph_assert(miter->second.have == eversion_t());
}
checked.insert(i->soid);
} else {
<< i.first << " " << i.second
<< " last_backfill = " << info.last_backfill
<< dendl;
- assert(0 == "invalid missing set entry found");
+ ceph_abort_msg("invalid missing set entry found");
}
bufferlist bv;
int r = store->getattr(
- pg_coll,
+ ch,
ghobject_t(i.first, ghobject_t::NO_GEN, info.pgid.shard),
OI_ATTR,
bv);
if (r >= 0) {
object_info_t oi(bv);
- assert(oi.version == i.second.have || eversion_t() == i.second.have);
+ ceph_assert(oi.version == i.second.have || eversion_t() == i.second.have);
} else {
- assert(i.second.is_delete() || eversion_t() == i.second.have);
+ ceph_assert(i.second.is_delete() || eversion_t() == i.second.have);
}
}
} else {
- assert(must_rebuild);
+ ceph_assert(must_rebuild);
for (map<eversion_t, hobject_t>::reverse_iterator i =
divergent_priors.rbegin();
i != divergent_priors.rend();
did.insert(i->second);
bufferlist bv;
int r = store->getattr(
- pg_coll,
+ ch,
ghobject_t(i->second, ghobject_t::NO_GEN, info.pgid.shard),
OI_ATTR,
bv);
<< "), assuming it is tracker.ceph.com/issues/17916"
<< dendl;
} else {
- assert(oi.version == i->first);
+ ceph_assert(oi.version == i->first);
}
} else {
ldpp_dout(dpp, 15) << "read_log_and_missing missing " << *i << dendl;